hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r600499 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/
Date Mon, 03 Dec 2007 12:13:19 GMT
Author: ddas
Date: Mon Dec  3 04:13:18 2007
New Revision: 600499

URL: http://svn.apache.org/viewvc?rev=600499&view=rev
Log:
HADOOP-1900. Makes the heartbeat and task event queries interval dependent on the cluster
size. Contributed by Amareshwari Sri Ramadasu.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=600499&r1=600498&r2=600499&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Dec  3 04:13:18 2007
@@ -91,6 +91,9 @@
     HADOOP-1965. Makes the sortAndSpill in MapTask a separate thread.
     (Amar Kamat via ddas)
 
+    HADOOP-1900. Makes the heartbeat and task event queries interval 
+    dependent on the cluster size.  (Amareshwari Sri Ramadasu via ddas)
+
   BUG FIXES
 
     HADOOP-2100.  Remove faulty check for existence of $HADOOP_PID_DIR and let

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java?rev=600499&r1=600498&r2=600499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java Mon Dec 
3 04:13:18 2007
@@ -35,6 +35,7 @@
 class HeartbeatResponse implements Writable, Configurable {
   Configuration conf = null;
   short responseId;
+  int heartbeatInterval;
   TaskTrackerAction[] actions;
 
   HeartbeatResponse() {}
@@ -42,6 +43,7 @@
   HeartbeatResponse(short responseId, TaskTrackerAction[] actions) {
     this.responseId = responseId;
     this.actions = actions;
+    this.heartbeatInterval = MRConstants.HEARTBEAT_INTERVAL_MIN;
   }
   
   public void setResponseId(short responseId) {
@@ -68,8 +70,17 @@
     return conf;
   }
 
+  public void setHeartbeatInterval(int interval) {
+    this.heartbeatInterval = interval;
+  }
+  
+  public int getHeartbeatInterval() {
+    return heartbeatInterval;
+  }
+  
   public void write(DataOutput out) throws IOException {
     out.writeShort(responseId);
+    out.writeInt(heartbeatInterval);
     if (actions == null) {
       WritableUtils.writeVInt(out, 0);
     } else {
@@ -84,6 +95,7 @@
   
   public void readFields(DataInput in) throws IOException {
     this.responseId = in.readShort();
+    this.heartbeatInterval = in.readInt();
     int length = WritableUtils.readVInt(in);
     if (length > 0) {
       actions = new TaskTrackerAction[length];

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=600499&r1=600498&r2=600499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Mon Dec
 3 04:13:18 2007
@@ -37,8 +37,9 @@
    * version 6 adds maxTasks to TaskTrackerStatus for HADOOP-1245
    * version 7 replaces maxTasks by maxMapTasks and maxReduceTasks in 
    * TaskTrackerStatus for HADOOP-1274
+   * Version 8: HeartbeatResponse is added with the next heartbeat interval.
    */
-  public static final long versionID = 7L;
+  public static final long versionID = 8L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=600499&r1=600498&r2=600499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Dec  3 04:13:18
2007
@@ -1179,6 +1179,9 @@
       actions.addAll(killTasksList);
     }
      
+    // calculate next heartbeat interval and put in heartbeat response
+    int nextInterval = getNextHeartbeatInterval();
+    response.setHeartbeatInterval(nextInterval);
     response.setActions(
                         actions.toArray(new TaskTrackerAction[actions.size()]));
         
@@ -1190,7 +1193,21 @@
         
     return response;
   }
-    
+  
+  /**
+   * Calculates next heartbeat interval using cluster size.
+   * Heartbeat interval is incremented 1second for every 50 nodes. 
+   * @return next heartbeat interval.
+   */
+  private int getNextHeartbeatInterval() {
+    // get the no of task trackers
+    int clusterSize = getClusterStatus().getTaskTrackers();
+    int heartbeatInterval =  Math.max(
+                                1000 * (clusterSize / CLUSTER_INCREMENT + 1),
+                                HEARTBEAT_INTERVAL_MIN) ;
+    return heartbeatInterval;
+  }
+
   /**
    * Return if the specified tasktracker is in the hosts list, 
    * if one was configured.  If none was configured, then this 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?rev=600499&r1=600498&r2=600499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Mon Dec  3 04:13:18
2007
@@ -25,7 +25,9 @@
   //
   // Timeouts, constants
   //
-  public static final long HEARTBEAT_INTERVAL = 10 * 1000;
+  public static final int HEARTBEAT_INTERVAL_MIN = 5 * 1000;
+  
+  public static final int CLUSTER_INCREMENT = 50;
 
   //for the inmemory filesystem (to do in-memory merge)
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=600499&r1=600498&r2=600499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Dec  3 04:13:18
2007
@@ -161,7 +161,8 @@
   /**
    * the minimum interval between jobtracker polls
    */
-  private static final long MIN_POLL_INTERVAL = 5000;
+  private static final int MIN_POLL_INTERVAL = 5000;
+  private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
   /**
    * Number of maptask completion events locations to poll for at one time
    */  
@@ -452,6 +453,9 @@
     mapEventsFetcher.start();
   }
     
+  // Object on wait which MapEventsFetcherThread is going to wait.
+  private Object waitingOn = new Object();
+
   private class MapEventsFetcherThread extends Thread {
 
     private List <FetchStatus> reducesInShuffle() {
@@ -507,14 +511,31 @@
           // possibly belonging to different jobs
           for (FetchStatus f : fList) {
             try {
-                
               f.fetchMapCompletionEvents();
-                
-              try {
-                Thread.sleep(MIN_POLL_INTERVAL);
-              } catch (InterruptedException ie) {
-                LOG.info("Shutting down: " + getName());
-                return;
+              long startWait;
+              long endWait;
+              // polling interval is heartbeat interval
+              int waitTime = heartbeatInterval;
+              // Thread will wait for a minumum of MIN_POLL_INTERVAL, 
+              // if it is notified before that, notification will be ignored. 
+              int minWait = MIN_POLL_INTERVAL;
+              synchronized (waitingOn) {
+                try {
+                  while (true) {
+                    startWait = System.currentTimeMillis();
+                    waitingOn.wait(waitTime);
+                    endWait = System.currentTimeMillis();
+                    int diff = (int)(endWait - startWait);
+                    if (diff >= minWait) {
+                      break;
+                    }
+                    minWait = minWait - diff;
+                    waitTime = minWait;
+                  }
+                } catch (InterruptedException ie) {
+                  LOG.info("Shutting down: " + getName());
+                  return;
+                }
               }
             } catch (Exception e) {
               LOG.warn(
@@ -548,12 +569,21 @@
         
       TaskCompletionEvent[] mapEvents = 
         TaskCompletionEvent.EMPTY_ARRAY;
+      boolean notifyFetcher = false; 
       synchronized (allMapEvents) {
         if (allMapEvents.size() > fromId) {
           int actualMax = Math.min(max, (allMapEvents.size() - fromId));
           List <TaskCompletionEvent> eventSublist = 
             allMapEvents.subList(fromId, actualMax + fromId);
           mapEvents = eventSublist.toArray(mapEvents);
+        } else {
+          // Notify Fetcher thread. 
+          notifyFetcher = true;
+        }
+      }
+      if (notifyFetcher) {
+        synchronized (waitingOn) {
+          waitingOn.notify();
         }
       }
       return mapEvents;
@@ -824,7 +854,7 @@
       try {
         long now = System.currentTimeMillis();
 
-        long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
+        long waitTime = heartbeatInterval - (now - lastHeartbeat);
         if (waitTime > 0) {
           // sleeps for the wait time, wakes up if a task is finished.
           synchronized(finishedCount) {
@@ -848,6 +878,8 @@
         }
             
         lastHeartbeat = now;
+        // resetting heartbeat interval from the response.
+        heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
         justStarted = false;
         if (actions != null){ 
           for(TaskTrackerAction action: actions) {



Mime
View raw message