hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r535982 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTask.java
Date Mon, 07 May 2007 20:44:15 GMT
Author: cutting
Date: Mon May  7 13:44:14 2007
New Revision: 535982

URL: http://svn.apache.org/viewvc?view=rev&rev=535982
Log:
HADOOP-1270.  Randomize the fetch of map outputs, speeding the shuffle.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=535982&r1=535981&r2=535982
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May  7 13:44:14 2007
@@ -345,6 +345,9 @@
 102. HADOOP-1326.  Change JobClient#RunJob() to return the job.
     (omalley via cutting)
 
+103. HADOOP-1270.  Randomize the fetch of map outputs, speeding the
+     shuffle.  (Arun C Murthy via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=535982&r1=535981&r2=535982
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon May  7 13:44:14
2007
@@ -29,7 +29,6 @@
 import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Iterator;
@@ -484,10 +483,10 @@
     private int probe_sample_size = 100;
     
     /**
-     * a hashmap from mapId to MapOutputLocation for retrials
+     * a list of map output locations for fetch retrials 
      */
-    private Map<Integer, MapOutputLocation> retryFetches =
-      new HashMap<Integer, MapOutputLocation>();
+    private List<MapOutputLocation> retryFetches =
+      new ArrayList<MapOutputLocation>();
     
     /** 
      * a TreeSet for needed map outputs
@@ -495,6 +494,8 @@
     private Set <Integer> neededOutputs = 
       Collections.synchronizedSet(new TreeSet<Integer>());
     
+    private Random random = null;
+    
     /** Represents the result of an attempt to copy a map output */
     private class CopyResult {
       
@@ -783,12 +784,19 @@
       this.shuffleMetrics = 
         MetricsUtil.createRecord(metricsContext, "shuffleInput");
       this.shuffleMetrics.setTag("user", conf.getUser());
+
+      // Seed the random number generator with a reasonably globally unique seed
+      long randomSeed = System.nanoTime() + 
+                        (long)Math.pow(this.reduceTask.getPartition(),
+                                       (this.reduceTask.getPartition()%10)
+                                      );
+      this.random = new Random(randomSeed);
     }
     
     public boolean fetchOutputs() throws IOException {
       final int      numOutputs = reduceTask.getNumMaps();
-      Map<Integer, MapOutputLocation> knownOutputs = 
-        new HashMap<Integer, MapOutputLocation>();
+      List<MapOutputLocation> knownOutputs = 
+        new ArrayList<MapOutputLocation>(numCopiers);
       int            numInFlight = 0, numCopied = 0;
       int            lowThreshold = numCopiers*2;
       long           bytesTransferred = 0;
@@ -834,15 +842,14 @@
             // The replacements, if at all, will happen when we query the
             // tasktracker and put the mapId hashkeys with new 
             // MapOutputLocations as values
-            knownOutputs.putAll(retryFetches);
+            knownOutputs.addAll(retryFetches);
             // The call getsMapCompletionEvents will modify fromEventId to a val
             // that it should be for the next call to getSuccessMapEvents
             List <MapOutputLocation> locs = getMapCompletionEvents(fromEventId);
 
             // put discovered them on the known list
             for (int i=0; i < locs.size(); i++) {
-              knownOutputs.put(new Integer(locs.get(i).getMapId()), 
-                      locs.get(i));
+              knownOutputs.add(locs.get(i));
             }
             LOG.info(reduceTask.getTaskId() +
                     " Got " + locs.size() + 
@@ -865,7 +872,11 @@
                    " known map output location(s); scheduling...");
           
           synchronized (scheduledCopies) {
-            Iterator locIt = knownOutputs.values().iterator();
+            // Randomize the map output locations to prevent 
+            // all reduce-tasks swamping the same tasktracker
+            Collections.shuffle(knownOutputs, this.random);
+            
+            Iterator locIt = knownOutputs.iterator();
             
             currentTime = System.currentTimeMillis();
             while (locIt.hasNext()) {
@@ -928,7 +939,7 @@
                          cr.getLocation().getMapTaskId() + " from host: " + 
                          cr.getHost());
               } else {
-                retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
+                retryFetches.add(cr.getLocation());
                 
                 // wait a random amount of time for next contact
                 currentTime = System.currentTimeMillis();
@@ -944,11 +955,11 @@
                 // the failure is due to a lost tasktracker (causes many
                 // unnecessary backoffs). If not, we only take a small hit
                 // polling the tasktracker a few more times
-                Iterator locIt = knownOutputs.values().iterator();
+                Iterator locIt = knownOutputs.iterator();
                 while (locIt.hasNext()) {
                   MapOutputLocation loc = (MapOutputLocation)locIt.next();
                   if (cr.getHost().equals(loc.getHost())) {
-                    retryFetches.put(new Integer(loc.getMapId()), loc);
+                    retryFetches.add(loc);
                     locIt.remove();
                   }
                 }



Mime
View raw message