hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r654315 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/TaskTracker.java
Date Wed, 07 May 2008 23:01:49 GMT
Author: omalley
Date: Wed May  7 16:01:48 2008
New Revision: 654315

URL: http://svn.apache.org/viewvc?rev=654315&view=rev
Log:
HADOOP-3297. Fetch more task completion events from the job
tracker and task tracker. Contributed by Devaraj Das.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=654315&r1=654314&r2=654315&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May  7 16:01:48 2008
@@ -125,6 +125,9 @@
 
     HADOOP-3248. Optimization of saveFSImage. (Dhruba via shv)
 
+    HADOOP-3297. Fetch more task completion events from the job
+    tracker and task tracker. (ddas via omalley)
+
   BUG FIXES
 
     HADOOP-2905. 'fsck -move' triggers NPE in NameNode. 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=654315&r1=654314&r2=654315&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed May  7 16:01:48
2008
@@ -430,6 +430,9 @@
     /** Number of ms before timing out a copy */
     private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
     
+    /** Max events to fetch in one go from the tasktracker */
+    private static final int MAX_EVENTS_TO_FETCH = 10000;
+
     /**
      * our reduce task instance
      */
@@ -528,11 +531,6 @@
     private static final long MIN_POLL_INTERVAL = 1000;
     
     /**
-     * the number of map output locations to poll for at one time
-     */  
-    private int probe_sample_size = 100;
-    
-    /**
      * a list of map output locations for fetch retrials 
      */
     private List<MapOutputLocation> retryFetches =
@@ -1014,15 +1012,11 @@
         new ArrayList<MapOutputLocation>(numCopiers);
       int totalFailures = 0;
       int            numInFlight = 0, numCopied = 0;
-      int            lowThreshold = numCopiers*2;
       long           bytesTransferred = 0;
       DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
       final Progress copyPhase = 
         reduceTask.getProgress().phase();
       
-      //tweak the probe sample size (make it a function of numCopiers)
-      probe_sample_size = Math.max(numCopiers*5, 50);
-      
       for (int i = 0; i < numOutputs; i++) {
         neededOutputs.add(i);
         copyPhase.addPhase();       // add sub-phase per file
@@ -1067,30 +1061,20 @@
             // MapOutputLocations as values
             knownOutputs.addAll(retryFetches);
              
-            // ensure we have enough to keep us busy
-            boolean busy = isBusy(numInFlight, numCopiers, lowThreshold, 
-                                  uniqueHosts.size(), probe_sample_size, 
-                                  numOutputs - numCopied);
-            if (!busy) {
-              // The call getMapCompletionEvents will update fromEventId to
-              // used for the next call to getMapCompletionEvents
-              int currentNumKnownMaps = knownOutputs.size();
-              int currentNumObsoleteMapIds = obsoleteMapIds.size();
+            // The call getMapCompletionEvents will update fromEventId to
+            // used for the next call to getMapCompletionEvents
+            int currentNumKnownMaps = knownOutputs.size();
+            int currentNumObsoleteMapIds = obsoleteMapIds.size();
               getMapCompletionEvents(fromEventId, knownOutputs);
 
-            
-              LOG.info(reduceTask.getTaskID() + ": " +  
-                     "Got " + (knownOutputs.size()-currentNumKnownMaps) + 
-                     " new map-outputs & " + 
-                     (obsoleteMapIds.size()-currentNumObsoleteMapIds) + 
-                     " obsolete map-outputs from tasktracker and " + 
-                     retryFetches.size() + " map-outputs from previous failures"
-                     );
-            } else {
-              LOG.info(" Busy enough - did not query the tasktracker for " 
-                       + "new map outputs. Have "+ retryFetches.size() 
-                       + " map outputs from previous failures");
-            }
+
+            LOG.info(reduceTask.getTaskID() + ": " +  
+                "Got " + (knownOutputs.size()-currentNumKnownMaps) + 
+                " new map-outputs & " + 
+                (obsoleteMapIds.size()-currentNumObsoleteMapIds) + 
+                " obsolete map-outputs from tasktracker and " + 
+                retryFetches.size() + " map-outputs from previous failures"
+            );
             // clear the "failed" fetches hashmap
             retryFetches.clear();
           }
@@ -1418,24 +1402,6 @@
       }
     }
     
-    /** Added a check for whether #uniqueHosts < #copiers, and if so conclude
-    * we are not busy enough. The logic is that we fetch only one map output
-    * at a time from any given host and uniqueHosts keep a track of that. 
-    * As soon as we add a host to uniqueHosts, a 'copy' from that is 
-    * scheduled as well. Thus, when the size of uniqueHosts is >= numCopiers,
-    * it means that all copiers are busy. Although the converse is not true
-    * (e.g. in the case where we have more copiers than the number of hosts
-    * in the cluster), but it should generally be useful to do this check. 
-    **/ 
-    private boolean isBusy(int numInFlight, int numCopiers, int lowThreshold,
-                           int uniqueHostsSize, int probeSampleSize,  
-                           int remainCopy) {
-      if ((numInFlight < lowThreshold && remainCopy > probeSampleSize) || 
-          uniqueHostsSize < numCopiers) {
-        return false;
-      }
-      return true;
-    }
     
     private CopyResult getCopyResult() {  
       synchronized (copyResults) {
@@ -1477,7 +1443,7 @@
       
       TaskCompletionEvent events[] = 
         umbilical.getMapCompletionEvents(reduceTask.getJobID(), 
-                                         fromEventId.get(), probe_sample_size);
+                                       fromEventId.get(), MAX_EVENTS_TO_FETCH);
       
       // Note the last successful poll time-stamp
       lastPollTime = currentTime;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=654315&r1=654314&r2=654315&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed May  7 16:01:48
2008
@@ -161,12 +161,11 @@
   /**
    * the minimum interval between jobtracker polls
    */
-  private static final int MIN_POLL_INTERVAL = 5000;
   private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
   /**
    * Number of maptask completion events locations to poll for at one time
    */  
-  private int probe_sample_size = 50;
+  private int probe_sample_size = 500;
     
   private ShuffleServerMetrics shuffleServerMetrics;
   /** This class contains the methods that should be used for metrics-reporting
@@ -412,9 +411,8 @@
 
     this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
     this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
-    int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
     //tweak the probe sample size (make it a function of numCopiers)
-    probe_sample_size = Math.max(numCopiers*5, 50);
+    probe_sample_size = this.fConf.getInt("mapred.tasktracker.events.batchsize", 500);
     
     
     this.myMetrics = new TaskTrackerMetrics();
@@ -530,33 +528,15 @@
           }
           // now fetch all the map task events for all the reduce tasks
           // possibly belonging to different jobs
+          boolean fetchAgain = false; //flag signifying whether we want to fetch
+                                      //immediately again.
           for (FetchStatus f : fList) {
+            long currentTime = System.currentTimeMillis();
             try {
-              f.fetchMapCompletionEvents();
-              long startWait;
-              long endWait;
-              // polling interval is heartbeat interval
-              int waitTime = heartbeatInterval;
-              // Thread will wait for a minumum of MIN_POLL_INTERVAL, 
-              // if it is notified before that, notification will be ignored. 
-              int minWait = MIN_POLL_INTERVAL;
-              synchronized (waitingOn) {
-                try {
-                  while (true) {
-                    startWait = System.currentTimeMillis();
-                    waitingOn.wait(waitTime);
-                    endWait = System.currentTimeMillis();
-                    int diff = (int)(endWait - startWait);
-                    if (diff >= minWait) {
-                      break;
-                    }
-                    minWait = minWait - diff;
-                    waitTime = minWait;
-                  }
-                } catch (InterruptedException ie) {
-                  LOG.info("Shutting down: " + getName());
-                  return;
-                }
+              //the method below will return true when we have not 
+              //fetched all available events yet
+              if (f.fetchMapCompletionEvents(currentTime)) {
+                fetchAgain = true;
               }
             } catch (Exception e) {
               LOG.warn(
@@ -565,6 +545,17 @@
                        StringUtils.stringifyException(e)); 
             }
           }
+          synchronized (waitingOn) {
+            try {
+              int waitTime;
+              if (!fetchAgain) {
+                waitingOn.wait(heartbeatInterval);
+              }
+            } catch (InterruptedException ie) {
+              LOG.info("Shutting down: " + getName());
+              return;
+            }
+          }
         } catch (Exception e) {
           LOG.info("Ignoring exception "  + e.getMessage());
         }
@@ -579,6 +570,8 @@
     private List<TaskCompletionEvent> allMapEvents;
     /** What jobid this fetchstatus object is for*/
     private JobID jobId;
+    private long lastFetchTime;
+    private boolean fetchAgain;
      
     public FetchStatus(JobID jobId, int numMaps) {
       this.fromEventId = new IntWritable(0);
@@ -610,12 +603,26 @@
       return mapEvents;
     }
       
-    public void fetchMapCompletionEvents() throws IOException {
+    public boolean fetchMapCompletionEvents(long currTime) throws IOException {
+      if (!fetchAgain && (currTime - lastFetchTime) < heartbeatInterval) {
+        return false;
+      }
+      int currFromEventId = fromEventId.get();
       List <TaskCompletionEvent> recentMapEvents = 
         queryJobTracker(fromEventId, jobId, jobClient);
       synchronized (allMapEvents) {
         allMapEvents.addAll(recentMapEvents);
       }
+      lastFetchTime = currTime;
+      if (fromEventId.get() - currFromEventId >= probe_sample_size) {
+        //return true when we have fetched the full payload, indicating
+        //that we should fetch again immediately (there might be more to
+        //fetch
+        fetchAgain = true;
+        return true;
+      }
+      fetchAgain = false;
+      return false;
     }
   }
 



Mime
View raw message