hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r793457 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ src/webapps/job/
Date Mon, 13 Jul 2009 05:21:38 GMT
Author: sharad
Date: Mon Jul 13 05:21:37 2009
New Revision: 793457

URL: http://svn.apache.org/viewvc?rev=793457&view=rev
Log:
MAPREDUCE-467. Provide ability to collect statistics about total tasks and succeeded tasks
in different time windows.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerStatistics.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/StatisticsCollector.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestStatisticsCollector.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/webapps/job/machines.jsp

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=793457&r1=793456&r2=793457&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jul 13 05:21:37 2009
@@ -31,7 +31,10 @@
     MAPREDUCE-532. Provide a way to limit the number of used slots 
     per queue in the capacity scheduler.
     (Rahul Kumar Singh via yhemanth)
-    
+
+    MAPREDUCE-467. Provide ability to collect statistics about total tasks 
+    and succeeded tasks in different time windows. (sharad)
+
   IMPROVEMENTS
 
     HADOOP-5967. Sqoop should only use a single map task. (Aaron Kimball via

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=793457&r1=793456&r2=793457&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Jul 13
05:21:37 2009
@@ -1043,8 +1043,16 @@
       if (taskEvent != null) {
         this.taskCompletionEvents.add(taskEvent);
         taskCompletionEventTracker++;
+        JobTrackerStatistics.TaskTrackerStat ttStat = jobtracker.
+           getStatistics().getTaskTrackerStat(tip.machineWhereTaskRan(taskid));
+        if(ttStat != null) { // ttStat can be null in case of lost tracker
+          ttStat.incrTotalTasks();
+        }
         if (state == TaskStatus.State.SUCCEEDED) {
           completedTask(tip, status);
+          if(ttStat != null) {
+            ttStat.incrSucceededTasks();
+          }
         }
       }
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=793457&r1=793456&r2=793457&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jul 13 05:21:37
2009
@@ -385,7 +385,7 @@
                         faultyTrackers.numBlacklistedTrackers -= 1;
                       }
                       updateTaskTrackerStatus(trackerName, null);
-                      
+                      statistics.taskTrackerRemoved(trackerName);
                       // remove the mapping from the hosts list
                       String hostname = newProfile.getHost();
                       hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -1661,6 +1661,8 @@
     
   private FaultyTrackersInfo faultyTrackers = new FaultyTrackersInfo();
   
+  private JobTrackerStatistics statistics = 
+    new JobTrackerStatistics();
   //
   // Watch and expire TaskTracker objects using these structures.
   // We can map from Name->TaskTrackerStatus, or we can expire by time.
@@ -2614,6 +2616,9 @@
     }
   }
 
+  JobTrackerStatistics getStatistics() {
+    return statistics;
+  }
   /**
    * Adds a new node to the jobtracker. It involves adding it to the expiry
    * thread and adding it for resolution
@@ -2639,6 +2644,7 @@
       trackers = Collections.synchronizedSet(new HashSet<TaskTracker>());
       hostnameToTaskTracker.put(hostname, trackers);
     }
+    statistics.taskTrackerAdded(status.getTrackerName());
     LOG.info("Adding tracker " + status.getTrackerName() + " to host " 
              + hostname);
     trackers.add(taskTracker);

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerStatistics.java?rev=793457&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerStatistics.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerStatistics.java Mon
Jul 13 05:21:37 2009
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapred.StatisticsCollector.Stat;
+
+/**
+ * Collects the job tracker statistics.
+ *
+ */
+class JobTrackerStatistics {
+
+  final StatisticsCollector collector;
+  final Map<String, TaskTrackerStat> ttStats = 
+    new HashMap<String, TaskTrackerStat>();
+
+  JobTrackerStatistics() {
+    collector = new StatisticsCollector();
+    collector.start();
+  }
+
+  synchronized void taskTrackerAdded(String name) {
+    TaskTrackerStat stat = ttStats.get(name);
+    if(stat == null) {
+      stat =  new TaskTrackerStat(name);
+      ttStats.put(name, stat);
+    }
+  }
+
+  synchronized void taskTrackerRemoved(String name) {
+    TaskTrackerStat stat = ttStats.remove(name);
+    if(stat != null) {
+      stat.remove();
+    }
+  }
+
+  synchronized TaskTrackerStat getTaskTrackerStat(String name) {
+    return ttStats.get(name);
+  }
+
+  class TaskTrackerStat {
+    final String totalTasksKey;
+    final Stat totalTasksStat;
+
+    final String succeededTasksKey;
+    final Stat succeededTasksStat;
+
+    TaskTrackerStat(String trackerName) {
+      totalTasksKey = trackerName+"-"+"totalTasks";
+      totalTasksStat = collector.createStat(totalTasksKey);
+      succeededTasksKey = trackerName+"-"+"succeededTasks";
+      succeededTasksStat = collector.createStat(succeededTasksKey);
+    }
+
+    synchronized void incrTotalTasks() {
+      totalTasksStat.inc();
+    }
+
+    synchronized void incrSucceededTasks() {
+      succeededTasksStat.inc();
+    }
+
+    synchronized void remove() {
+      collector.removeStat(totalTasksKey);
+      collector.removeStat(succeededTasksKey);
+    }
+
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/StatisticsCollector.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/StatisticsCollector.java?rev=793457&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/StatisticsCollector.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/StatisticsCollector.java Mon
Jul 13 05:21:37 2009
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.mapred.StatisticsCollector.Stat.TimeStat;
+
+/**
+ * Collects the statistics in time windows.
+ */
+class StatisticsCollector {
+
+  private static final int DEFAULT_PERIOD = 5;
+
+  static final TimeWindow 
+    SINCE_START = new TimeWindow("Since Start", -1, -1);
+  
+  static final TimeWindow 
+    LAST_WEEK = new TimeWindow("Last Week", 7 * 24 * 60 * 60, 60 * 60);
+  
+  static final TimeWindow 
+    LAST_DAY = new TimeWindow("Last Day", 24 * 60 * 60, 60 * 60);
+  
+  static final TimeWindow 
+    LAST_HOUR = new TimeWindow("Last Hour", 60 * 60, 60);
+  
+  static final TimeWindow 
+    LAST_MINUTE = new TimeWindow("Last Minute", 60, 10);
+
+  static final TimeWindow[] DEFAULT_COLLECT_WINDOWS = {
+    StatisticsCollector.SINCE_START,
+    StatisticsCollector.LAST_DAY,
+    StatisticsCollector.LAST_HOUR
+    };
+
+  private final int period;
+  private boolean started;
+  
+  private final Map<TimeWindow, StatUpdater> updaters = 
+    new LinkedHashMap<TimeWindow, StatUpdater>();
+  private final Map<String, Stat> statistics = new HashMap<String, Stat>();
+
+  StatisticsCollector() {
+    this(DEFAULT_PERIOD);
+  }
+
+  StatisticsCollector(int period) {
+    this.period = period;
+  }
+
+  synchronized void start() {
+    if (started) {
+      return;
+    }
+    Timer timer = new Timer("Timer thread for monitoring ", true);
+    TimerTask task = new TimerTask() {
+      public void run() {
+        update();
+      }
+    };
+    long millis = period * 1000;
+    timer.scheduleAtFixedRate(task, millis, millis);
+    started = true;
+  }
+
+  protected synchronized void update() {
+    for (StatUpdater c : updaters.values()) {
+      c.update();
+    }
+  }
+
+  Map<TimeWindow, StatUpdater> getUpdaters() {
+    return Collections.unmodifiableMap(updaters);
+  }
+
+  Map<String, Stat> getStatistics() {
+    return Collections.unmodifiableMap(statistics);
+  }
+
+  synchronized Stat createStat(String name) {
+    return createStat(name, DEFAULT_COLLECT_WINDOWS);
+  }
+
+  synchronized Stat createStat(String name, TimeWindow[] windows) {
+    if (statistics.get(name) != null) {
+      throw new RuntimeException("Stat with name "+ name + 
+          " is already defined");
+    }
+    Map<TimeWindow, TimeStat> timeStats = 
+      new LinkedHashMap<TimeWindow, TimeStat>();
+    for (TimeWindow window : windows) {
+      StatUpdater collector = updaters.get(window);
+      if (collector == null) {
+        if(SINCE_START.equals(window)) {
+          collector = new StatUpdater();
+        } else {
+          collector = new TimeWindowStatUpdater(window, period);
+        }
+        updaters.put(window, collector);
+      }
+      TimeStat timeStat = new TimeStat();
+      collector.addTimeStat(name, timeStat);
+      timeStats.put(window, timeStat);
+    }
+
+    Stat stat = new Stat(name, timeStats);
+    statistics.put(name, stat);
+    return stat;
+  }
+
+  synchronized Stat removeStat(String name) {
+    Stat stat = statistics.remove(name);
+    if (stat != null) {
+      for (StatUpdater collector : updaters.values()) {
+        collector.removeTimeStat(name);
+      }
+    }
+    return stat;
+  }
+
+  static class TimeWindow {
+    final String name;
+    final int windowSize;
+    final int updateGranularity;
+    TimeWindow(String name, int windowSize, int updateGranularity) {
+      if (updateGranularity > windowSize) {
+        throw new RuntimeException(
+            "Invalid TimeWindow: updateGranularity > windowSize");
+      }
+      this.name = name;
+      this.windowSize = windowSize;
+      this.updateGranularity = updateGranularity;
+    }
+
+    public int hashCode() {
+      return name.hashCode() + updateGranularity + windowSize;
+    }
+
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      final TimeWindow other = (TimeWindow) obj;
+      if (name == null) {
+        if (other.name != null)
+          return false;
+      } else if (!name.equals(other.name))
+        return false;
+      if (updateGranularity != other.updateGranularity)
+        return false;
+      if (windowSize != other.windowSize)
+        return false;
+      return true;
+    }
+  }
+
+  static class Stat {
+    final String name;
+    private Map<TimeWindow, TimeStat> timeStats;
+
+    private Stat(String name, Map<TimeWindow, TimeStat> timeStats) {
+      this.name = name;
+      this.timeStats = timeStats;
+    }
+
+    public synchronized void inc(int incr) {
+      for (TimeStat ts : timeStats.values()) {
+        ts.inc(incr);
+      }
+    }
+
+    public synchronized void inc() {
+      inc(1);
+    }
+
+    public synchronized Map<TimeWindow, TimeStat> getValues() {
+      return Collections.unmodifiableMap(timeStats);
+    }
+
+    static class TimeStat {
+      private final LinkedList<Integer> buckets = new LinkedList<Integer>();
+      private int value;
+      private int currentValue;
+
+      public synchronized int getValue() {
+        return value;
+      }
+
+      private synchronized void inc(int i) {
+        currentValue += i;
+      }
+
+      private synchronized void addBucket() {
+        buckets.addLast(currentValue);
+        setValueToCurrent();
+      }
+
+      private synchronized void setValueToCurrent() {
+        value += currentValue;
+        currentValue = 0;
+      }
+
+      private synchronized void removeBucket() {
+        int removed = buckets.removeFirst();
+        value -= removed;
+      }
+    }
+  }
+
+  private static class StatUpdater {
+
+    protected final Map<String, TimeStat> statToCollect = 
+      new HashMap<String, TimeStat>();
+
+    synchronized void addTimeStat(String name, TimeStat s) {
+      statToCollect.put(name, s);
+    }
+
+    synchronized TimeStat removeTimeStat(String name) {
+      return statToCollect.remove(name);
+    }
+
+    synchronized void update() {
+      for (TimeStat stat : statToCollect.values()) {
+        stat.setValueToCurrent();
+      }
+    }
+  }
+
+  /**
+   * Updates TimeWindow statistics in buckets.
+   *
+   */
+  private static class TimeWindowStatUpdater extends StatUpdater{
+
+    final int collectBuckets;
+    final int updatesPerBucket;
+    
+    private int updates;
+    private int buckets;
+
+    TimeWindowStatUpdater(TimeWindow w, int updatePeriod) {
+      if (updatePeriod > w.updateGranularity) {
+        throw new RuntimeException(
+            "Invalid conf: updatePeriod > updateGranularity");
+      }
+      collectBuckets = w.windowSize / w.updateGranularity;
+      updatesPerBucket = w.updateGranularity / updatePeriod;
+    }
+
+    synchronized void update() {
+      updates++;
+      if (updates == updatesPerBucket) {
+        for(TimeStat stat : statToCollect.values()) {
+          stat.addBucket();
+        }
+        updates = 0;
+        buckets++;
+        if (buckets > collectBuckets) {
+          for (TimeStat stat : statToCollect.values()) {
+            stat.removeBucket();
+          }
+          buckets--;
+        }
+      }
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestStatisticsCollector.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestStatisticsCollector.java?rev=793457&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestStatisticsCollector.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestStatisticsCollector.java
Mon Jul 13 05:21:37 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.mapred.StatisticsCollector.TimeWindow;
+import org.apache.hadoop.mapred.StatisticsCollector.Stat;
+
+public class TestStatisticsCollector extends TestCase{
+
+  public void testMovingWindow() throws Exception {
+    StatisticsCollector collector = new StatisticsCollector(1);
+    TimeWindow window = new TimeWindow("test", 6, 2);
+    TimeWindow sincStart = StatisticsCollector.SINCE_START;
+    TimeWindow[] windows = {sincStart, window};
+    
+    Stat stat = collector.createStat("m1", windows);
+    
+    stat.inc(3);
+    collector.update();
+    assertEquals(0, stat.getValues().get(window).getValue());
+    assertEquals(3, stat.getValues().get(sincStart).getValue());
+    
+    stat.inc(3);
+    collector.update();
+    assertEquals((3+3), stat.getValues().get(window).getValue());
+    assertEquals(6, stat.getValues().get(sincStart).getValue());
+    
+    stat.inc(10);
+    collector.update();
+    assertEquals((3+3), stat.getValues().get(window).getValue());
+    assertEquals(16, stat.getValues().get(sincStart).getValue());
+    
+    stat.inc(10);
+    collector.update();
+    assertEquals((3+3+10+10), stat.getValues().get(window).getValue());
+    assertEquals(26, stat.getValues().get(sincStart).getValue());
+    
+    stat.inc(10);
+    collector.update();
+    stat.inc(10);
+    collector.update();
+    assertEquals((3+3+10+10+10+10), stat.getValues().get(window).getValue());
+    assertEquals(46, stat.getValues().get(sincStart).getValue());
+    
+    stat.inc(10);
+    collector.update();
+    assertEquals((3+3+10+10+10+10), stat.getValues().get(window).getValue());
+    assertEquals(56, stat.getValues().get(sincStart).getValue());
+    
+    stat.inc(12);
+    collector.update();
+    assertEquals((10+10+10+10+10+12), stat.getValues().get(window).getValue());
+    assertEquals(68, stat.getValues().get(sincStart).getValue());
+    
+    stat.inc(13);
+    collector.update();
+    assertEquals((10+10+10+10+10+12), stat.getValues().get(window).getValue());
+    assertEquals(81, stat.getValues().get(sincStart).getValue());
+    
+    stat.inc(14);
+    collector.update();
+    assertEquals((10+10+10+12+13+14), stat.getValues().get(window).getValue());
+    assertEquals(95, stat.getValues().get(sincStart).getValue());
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/machines.jsp?rev=793457&r1=793456&r2=793457&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/machines.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/machines.jsp Mon Jul 13 05:21:37 2009
@@ -50,9 +50,10 @@
       out.println("<h2>Task Trackers</h2>");
       c = tracker.taskTrackers();
     }
-    int noCols = 9;
+    int noCols = 9 + 
+      (2 * tracker.getStatistics().collector.DEFAULT_COLLECT_WINDOWS.length);
     if(type.equals("blacklisted")) {
-      noCols = 10;
+      noCols = noCols + 1;
     }
     if (c.size() == 0) {
       out.print("There are currently no known " + type + " Task Trackers.");
@@ -70,6 +71,12 @@
       if(type.equals("blacklisted")) {
       	out.print("<td><b>Reason For blacklisting</b></td>");
       }
+      for(StatisticsCollector.TimeWindow window : tracker.getStatistics().
+           collector.DEFAULT_COLLECT_WINDOWS) {
+         out.println("<td><b>Total Tasks "+window.name+"</b></td>");
+         out.println("<td><b>Succeeded Tasks "+window.name+"</b></td>");
+       }
+      
       out.print("<td><b>Seconds since heartbeat</b></td></tr>\n");
 
       int maxFailures = 0;
@@ -112,6 +119,16 @@
         if(type.equals("blacklisted")) {
           out.print("</td><td>" + tracker.getReasonsForBlacklisting(tt.getHost()));
         }
+        for(StatisticsCollector.TimeWindow window : tracker.getStatistics().
+          collector.DEFAULT_COLLECT_WINDOWS) {
+          JobTrackerStatistics.TaskTrackerStat ttStat = tracker.getStatistics().
+             getTaskTrackerStat(tt.getTrackerName());
+          out.println("</td><td>" + ttStat.totalTasksStat.getValues().
+                                get(window).getValue());
+          out.println("</td><td>" + ttStat.succeededTasksStat.getValues().
+                                get(window).getValue());
+        }
+        
         out.print("</td><td>" + sinceHeartbeat + "</td></tr>\n");
       }
       out.print("</table>\n");



Mime
View raw message