hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r397953 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/TaskInProgress.java
Date Fri, 28 Apr 2006 16:56:02 GMT
Author: cutting
Date: Fri Apr 28 09:56:00 2006
New Revision: 397953

URL: http://svn.apache.org/viewcvs?rev=397953&view=rev
Log:
HADOOP-173.  Optimize allocation of tasks with local data.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=397953&r1=397952&r2=397953&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Apr 28 09:56:00 2006
@@ -114,7 +114,9 @@
 
 30. Permit specification of a higher replication levels for job
     submission files (job.xml and job.jar).  This helps with large
-    clusters, since these files are read by every node.
+    clusters, since these files are read by every node.  (cutting)
+
+31. HADOOP-173.  Optimize allocation of tasks with local data.  (cutting)
 
 
 Release 0.1.1 - 2006-04-08

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=397953&r1=397952&r2=397953&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Apr 28 09:56:00
2006
@@ -44,7 +44,7 @@
     int numReduceTasks = 0;
 
     JobTracker jobtracker = null;
-    TreeMap cachedHints = new TreeMap();
+    HashMap hostToMaps = new HashMap();
 
     long startTime;
     long finishTime;
@@ -159,29 +159,28 @@
         // Obtain some tasktracker-cache information for the map task splits.
         //
         for (int i = 0; i < maps.length; i++) {
-            String hints[][] = fs.getFileCacheHints(splits[i].getPath(), splits[i].getStart(),
splits[i].getLength());
-            cachedHints.put(maps[i].getTIPId(), hints);
+            String hints[][] =
+              fs.getFileCacheHints(splits[i].getPath(), splits[i].getStart(),
+                                   splits[i].getLength());
+
+            if (hints != null) {
+              for (int k = 0; k < hints.length; k++) {
+                for (int j = 0; j < hints[k].length; j++) {
+                  ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k][j]);
+                  if (hostMaps == null) {
+                    hostMaps = new ArrayList();
+                    hostToMaps.put(hints[k][j], hostMaps);
+                  }
+                  hostMaps.add(maps[i]);
+                }
+              }
+            }
         }
 
         this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
         tasksInited = true;
     }
 
-    /**
-     * This is called by TaskInProgress objects.  The JobInProgress
-     * prefetches and caches a lot of these hints.  If the hint is
-     * not available, then we pass it through to the filesystem.
-     */
-    String[][] getFileCacheHints(String tipID, Path f, long start, long len) throws IOException
{
-        String results[][] = (String[][]) cachedHints.get(tipID);
-        if (tipID == null) {
-            FileSystem fs = FileSystem.get(conf);
-            results = fs.getFileCacheHints(f, start, len);
-            cachedHints.put(tipID, results);
-        }
-        return results;
-    }
-
     /////////////////////////////////////////////////////
     // Accessors for the JobInProgress
     /////////////////////////////////////////////////////
@@ -301,20 +300,18 @@
         // the TaskTracker checking in.  That means the block
         // doesn't have to be transmitted from another node.
         //
-        for (int i = 0; i < maps.length; i++) {
-            int realIdx = (i + firstMapToTry) % maps.length; 
-            if (maps[realIdx].hasTaskWithCacheHit(taskTracker, tts)) {
-                if (cacheTarget < 0) {
-                  if (maps[realIdx].hasFailedOnMachine(taskTracker)) {
-                    if (failedTarget < 0) {
-                      failedTarget = realIdx;
-                    }
-                  } else {
-                    cacheTarget = realIdx;
-                    break;
-                  }
-                }
+        ArrayList hostMaps = (ArrayList)hostToMaps.get(tts.getHost());
+        if (hostMaps != null) {
+          Iterator i = hostMaps.iterator();
+          while (i.hasNext()) {
+            TaskInProgress tip = (TaskInProgress)i.next();
+            if (tip.hasTask() && !tip.hasFailedOnMachine(taskTracker)) {
+              LOG.info("Found task with local split for "+tts.getHost());
+              cacheTarget = tip.getIdWithinJob();
+              i.remove();
+              break;
             }
+          }
         }
 
         //

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=397953&r1=397952&r2=397953&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri Apr 28 09:56:00
2006
@@ -55,7 +55,6 @@
     // Defines the TIP
     private String jobFile = null;
     private FileSplit split = null;
-    private String hints[][] = null;
     private TaskInProgress predecessors[] = null;
     private int partition;
     private JobTracker jobtracker;
@@ -387,34 +386,7 @@
     // "Action" methods that actually require the TIP
     // to do something.
     /////////////////////////////////////////////////
-    /**
-     * Return whether this TIP has an DFS cache-driven task 
-     * to run at the given taskTracker.
-     */
-    boolean hasTaskWithCacheHit(String taskTracker, TaskTrackerStatus tts) {
-        if (failed || isComplete() || recentTasks.size() > 0) {
-            return false;
-        } else {
-            try {
-                if (isMapTask()) {
-                    if (hints == null) {
-                        hints = job.getFileCacheHints(getTIPId(), split.getPath(), split.getStart(),
split.getLength());
-                    }
-                    if (hints != null) {
-                        for (int i = 0; i < hints.length; i++) {
-                            for (int j = 0; j < hints[i].length; j++) {
-                                if (hints[i][j].equals(tts.getHost())) {
-                                    return true;
-                                }
-                            }
-                        }
-                    }
-                }
-            } catch (IOException ie) {
-            }
-            return false;
-        }
-    }
+
     /**
      * Return whether this TIP has a non-speculative task to run
      */
@@ -457,8 +429,7 @@
      */
     public Task getTaskToRun(String taskTracker, TaskTrackerStatus tts, double avgProgress)
{
         Task t = null;
-        if (hasTaskWithCacheHit(taskTracker, tts) ||
-            hasTask() || 
+        if (hasTask() || 
             hasSpeculativeTask(avgProgress)) {
 
             String taskid = (String) usableTaskIds.first();



Mime
View raw message