hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r632035 [3/3] - in /hadoop/core/trunk: ./ conf/ docs/ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/net/ src/test/org/apache/hadoop/dfs/ src/test/org/...
Date Thu, 28 Feb 2008 16:04:47 GMT
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Feb 28 08:04:34 2008
@@ -20,7 +20,7 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +38,7 @@
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.net.*;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -74,8 +75,12 @@
 
   JobPriority priority = JobPriority.NORMAL;
   JobTracker jobtracker = null;
-  Map<String,List<TaskInProgress>> hostToMaps =
-    new HashMap<String,List<TaskInProgress>>();
+
+  // NetworkTopology Node to the set of TIPs
+  Map<Node, List<TaskInProgress>> nodesToMaps;
+  
+  private int maxLevel;
+  
   private int taskCompletionEventTracker = 0; 
   List<TaskCompletionEvent> taskCompletionEvents;
     
@@ -110,7 +115,8 @@
     NUM_FAILED_REDUCES,
     TOTAL_LAUNCHED_MAPS,
     TOTAL_LAUNCHED_REDUCES,
-    DATA_LOCAL_MAPS
+    DATA_LOCAL_MAPS,
+    RACK_LOCAL_MAPS
   }
   private Counters jobCounters = new Counters();
   
@@ -204,8 +210,61 @@
     jobMetrics.removeTag("counter");
     jobMetrics.remove();
   }
+    
+  private Node getParentNode(Node node, int level) {
+    for (int i = 0; node != null && i < level; i++) {
+      node = node.getParent();
+    }
+    return node;
+  }
+  private void printCache (Map<Node, List<TaskInProgress>> cache) {
+    LOG.info("The taskcache info:");
+    for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
+      List <TaskInProgress> tips = n.getValue();
+      LOG.info("Cached TIPs on node: " + n.getKey());
+      for (TaskInProgress tip : tips) {
+        LOG.info("tip : " + tip.getTIPId());
+      }
+    }
+  }
   
+  private Map<Node, List<TaskInProgress>> createCache(
+                         JobClient.RawSplit[] splits, int maxLevel) {
+    Map<Node, List<TaskInProgress>> cache = 
+      new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
+    for (int i = 0; i < splits.length; i++) {
+      for(String host: splits[i].getLocations()) {
+        Node node = jobtracker.resolveAndAddToTopology(host);
+        if (node == null) {
+          continue;
+        }
+        if (node.getLevel() < maxLevel) {
+          LOG.warn("Got a host whose level is: " + node.getLevel() +
+              ". Should get at least a level of value: " + maxLevel);
+          return null;
+        }
+        for (int j = 0; j < maxLevel; j++) {
+          node = getParentNode(node, j);
+          List<TaskInProgress> hostMaps = cache.get(node);
+          if (hostMaps == null) {
+            hostMaps = new ArrayList<TaskInProgress>();
+            cache.put(node, hostMaps);
+            hostMaps.add(maps[i]);
+          }
+          //check whether the hostMaps already contains an entry for a TIP
+          //This will be true for nodes that are racks and multiple nodes in
+          //the rack contain the input for a tip. Note that if it already
+          //exists in the hostMaps, it must be the last element there since
+          //we process one TIP at a time sequentially in the split-size order
+          if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
+            hostMaps.add(maps[i]);
+          }
+        }
+      }
+    }
+    return cache;
+  }
   /**
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
@@ -233,17 +292,11 @@
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
       maps[i] = new TaskInProgress(jobId, jobFile, 
-                                   splits[i].getClassName(),
-                                   splits[i].getBytes(), 
+                                   splits[i], 
                                    jobtracker, conf, this, i);
-      for(String host: splits[i].getLocations()) {
-        List<TaskInProgress> hostMaps = hostToMaps.get(host);
-        if (hostMaps == null) {
-          hostMaps = new ArrayList<TaskInProgress>();
-          hostToMaps.put(host, hostMaps);
-        }
-        hostMaps.add(maps[i]);              
-      }
+    }
+    if (numMapTasks > 0) { 
+      nodesToMaps = createCache(splits, (maxLevel = jobtracker.getNumTaskCacheLevels()));
     }
         
     // if no split is returned, job is considered completed and successful
@@ -410,7 +463,13 @@
       }
 
       if (null != ttStatus){
-        httpTaskLogLocation = "http://" + ttStatus.getHost() + ":" + 
+        String host;
+        if (NetUtils.getStaticResolution(ttStatus.getHost()) != null) {
+          host = NetUtils.getStaticResolution(ttStatus.getHost());
+        } else {
+          host = ttStatus.getHost();
+        }
+        httpTaskLogLocation = "http://" + host + ":" + 
           ttStatus.getHttpPort() + "/tasklog?plaintext=true&taskid=" +
           status.getTaskId();
       }
@@ -571,9 +630,9 @@
       return null;
     }
     
-    ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost());
+    
     int target = findNewTask(tts, clusterSize, status.mapProgress(), 
-                             maps, mapCache);
+                             maps, nodesToMaps);
     if (target == -1) {
       return null;
     }
@@ -695,6 +754,13 @@
     return trackerErrors;
   }
     
+  private boolean shouldRunSpeculativeTask(TaskInProgress task, 
+                                          double avgProgress,
+                                          String taskTracker) {
+    return task.hasSpeculativeTask(avgProgress) && 
+           !task.hasRunOnMachine(taskTracker);
+  }
+  
   /**
    * Find a new task to run.
    * @param tts The task tracker that is asking for a task
@@ -709,14 +775,16 @@
                           int clusterSize,
                           double avgProgress,
                           TaskInProgress[] tasks,
-                          List cachedTasks) {
+                          Map<Node,List<TaskInProgress>> cachedTasks) {
     String taskTracker = tts.getTrackerName();
+    int specTarget = -1;
 
     //
     // Update the last-known clusterSize
     //
     this.clusterSize = clusterSize;
 
+    Node node = jobtracker.getNode(tts.getHost());
     //
     // Check if too many tasks of this job have failed on this
     // tasktracker prior to assigning it a new one.
@@ -734,22 +802,56 @@
         
     //
     // See if there is a split over a block that is stored on
-    // the TaskTracker checking in.  That means the block
-    // doesn't have to be transmitted from another node.
+    // the TaskTracker checking in or the rack it belongs to and so on till
+    // maxLevel.  That means the block
+    // doesn't have to be transmitted from another node/rack/and so on.
+    // The way the cache is updated is such that in every lookup, the TIPs
+    // which are complete is removed. Running/Failed TIPs are not removed
+    // since we want to have locality optimizations even for FAILED/SPECULATIVE
+    // tasks.
     //
-    if (cachedTasks != null) {
-      Iterator i = cachedTasks.iterator();
-      while (i.hasNext()) {
-        TaskInProgress tip = (TaskInProgress)i.next();
-        i.remove();
-        if (tip.isRunnable() && 
-            !tip.isRunning() &&
-            !tip.hasFailedOnMachine(taskTracker)) {
-          LOG.info("Choosing cached task " + tip.getTIPId());
-          int cacheTarget = tip.getIdWithinJob();
-          jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
-          return cacheTarget;
+    if (cachedTasks != null && node != null) {
+      Node key = node;
+      for (int level = 0; level < maxLevel && key != null; level++) {
+        List <TaskInProgress> cacheForLevel = cachedTasks.get(key);
+        if (cacheForLevel != null) {
+          Iterator<TaskInProgress> i = cacheForLevel.iterator();
+          while (i.hasNext()) {
+            TaskInProgress tip = i.next();
+            // we remove only those TIPs that are data-local (the host having
+            // the data is running the task). We don't remove TIPs that are 
+            // rack-local for example since that would negatively impact
+            // the performance of speculative and failed tasks (imagine a case
+            // where we schedule one TIP rack-local and after sometime another
+            // tasktracker from the same rack is asking for a task, and the TIP
+            // in question has either failed or could be a speculative task
+            // candidate)
+            if (tip.isComplete() || level == 0) {
+              i.remove();
+            }
+            if (tip.isRunnable() && 
+                !tip.isRunning() &&
+                !tip.hasFailedOnMachine(taskTracker)) {
+              int cacheTarget = tip.getIdWithinJob();
+              if (level == 0) {
+                LOG.info("Choosing data-local task " + tip.getTIPId());
+                jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+              } else if (level == 1){
+                LOG.info("Choosing rack-local task " + tip.getTIPId());
+                jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+              } else {
+                LOG.info("Choosing cached task at level " + level + " " + 
+                          tip.getTIPId());
+              }
+              return cacheTarget;
+            }
+            if (specTarget == -1 &&
+                shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) {
+              specTarget = tip.getIdWithinJob();
+            }
+          }
         }
+        key = key.getParent();
       }
     }
 
@@ -759,7 +861,6 @@
     // a std. task to run.
     //
     int failedTarget = -1;
-    int specTarget = -1;
     for (int i = 0; i < tasks.length; i++) {
       TaskInProgress task = tasks[i];
       if (task.isRunnable()) {
@@ -781,8 +882,7 @@
             LOG.info("Choosing normal task " + tasks[i].getTIPId());
             return i;
           } else if (specTarget == -1 &&
-                     task.hasSpeculativeTask(avgProgress) && 
-                     !task.hasRunOnMachine(taskTracker)) {
+                     shouldRunSpeculativeTask(task, avgProgress, taskTracker)) {
             specTarget = i;
           }
         }
@@ -989,7 +1089,28 @@
         
     // the case when the map was complete but the task tracker went down.
     if (wasComplete && !isComplete) {
-      if (tip.isMapTask()){
+      if (tip.isMapTask()) {
+        // Put the task back in the cache. This will help locality for cases
+        // where we have a different TaskTracker from the same rack/switch
+        // asking for a task. Note that we don't add the TIP to the host's cache
+        // again since we don't execute a failed TIP on the same TT again, 
+        // and also we bother about only those TIPs that were successful
+        // earlier (wasComplete and !isComplete) 
+        // (since they might have been removed from the cache of other 
+        // racks/switches, if the input split blocks were present there too)
+        for (String host : tip.getSplitLocations()) {
+          Node node = jobtracker.getNode(host);
+          for (int level = 1; (node != null && level < maxLevel); level++) {
+            node = getParentNode(node, level);
+            if (node == null) {
+              break;
+            }
+            List<TaskInProgress> list = nodesToMaps.get(node);
+            if (list != null) {
+              list.add(tip);
+            }
+          }
+        }
         finishedMapTasks -= 1;
       }
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties Thu Feb 28 08:04:34 2008
@@ -7,4 +7,5 @@
 TOTAL_LAUNCHED_MAPS.name=      Launched map tasks
 TOTAL_LAUNCHED_REDUCES.name=   Launched reduce tasks
 DATA_LOCAL_MAPS.name=          Data-local map tasks
+RACK_LOCAL_MAPS.name=          Rack-local map tasks
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Feb 28 08:04:34 2008
@@ -57,8 +57,14 @@
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
 /*******************************************************
@@ -77,6 +83,11 @@
   State state = State.INITIALIZING;
   private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
 
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  private NetworkTopology clusterMap = new NetworkTopology();
+  private ResolutionThread resThread = new ResolutionThread();
+  private int numTaskCacheLevels; // the max level of a host in the network topology
+  
   /**
    * A client tried to submit a job before the Job Tracker was ready.
    */
@@ -544,6 +555,13 @@
   // (trackerID --> last sent HeartBeatResponse)
   Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap = 
     new TreeMap<String, HeartbeatResponse>();
+
+  // (trackerHostName --> Node (NetworkTopology))
+  Map<String, Node> trackerNameToNodeMap = 
+    Collections.synchronizedMap(new TreeMap<String, Node>());
+  
+  // Number of resolved entries
+  int numResolved;
     
   //
   // Watch and expire TaskTracker objects using these structures.
@@ -724,6 +742,11 @@
 
     // Same with 'localDir' except it's always on the local disk.
     jobConf.deleteLocalFiles(SUBDIR);
+    this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
+        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+            DNSToSwitchMapping.class), conf);
+    this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
+        NetworkTopology.DEFAULT_HOST_LEVEL);
     synchronized (this) {
       state = State.RUNNING;
     }
@@ -751,6 +774,7 @@
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
     this.expireTrackersThread.start();
+    this.resThread.start();
     this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
     this.retireJobsThread.start();
     this.initJobsThread = new Thread(this.initJobs, "initJobs");
@@ -835,6 +859,15 @@
         ex.printStackTrace();
       }
     }
+    if (this.resThread != null) {
+      LOG.info("Stopping DNSToSwitchMapping Resolution thread");
+      this.resThread.interrupt();
+      try {
+        this.resThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
     if (this.completedJobsStoreThread != null &&
         this.completedJobsStoreThread.isAlive()) {
       LOG.info("Stopping completedJobsStore thread");
@@ -1147,6 +1180,31 @@
     }
   }
 
+  public Node resolveAndAddToTopology(String name) {
+    List <String> tmpList = new ArrayList<String>(1);
+    tmpList.add(name);
+    List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
+    if (rNameList == null || rNameList.size() == 0) {
+      return null;
+    }
+    String rName = rNameList.get(0);
+    String networkLoc = NodeBase.normalize(rName);
+    Node node = null;
+    if ((node = clusterMap.getNode(networkLoc+"/"+name)) == null) {
+      node = new NodeBase(name, networkLoc);
+      clusterMap.add(node);
+    }
+    return node;
+  }
+  public Node getNode(String name) {
+    return trackerNameToNodeMap.get(name);
+  }
+  public int getNumTaskCacheLevels() {
+    return numTaskCacheLevels;
+  }
+  public int getNumResolvedTaskTrackers() {
+    return numResolved;
+  }
   ////////////////////////////////////////////////////
   // InterTrackerProtocol
   ////////////////////////////////////////////////////
@@ -1172,8 +1230,9 @@
       throw new DisallowedTaskTrackerException(status);
     }
 
-    // First check if the last heartbeat response got through 
+    // First check if the last heartbeat response got through
     String trackerName = status.getTrackerName();
+    
     HeartbeatResponse prevHeartbeatResponse =
       trackerToHeartbeatResponseMap.get(trackerName);
 
@@ -1346,15 +1405,76 @@
 
         if (initialContact) {
           trackerExpiryQueue.add(trackerStatus);
+          resThread.addToResolutionQueue(trackerStatus);
         }
       }
     }
 
     updateTaskStatuses(trackerStatus);
-
+    
     return true;
   }
 
+  private class ResolutionThread extends Thread {
+    private LinkedBlockingQueue<TaskTrackerStatus> queue = 
+      new LinkedBlockingQueue <TaskTrackerStatus>();
+    public ResolutionThread() {
+      setName("DNSToSwitchMapping reolution Thread");
+      setDaemon(true);
+    }
+    public void addToResolutionQueue(TaskTrackerStatus t) {
+      while (!queue.add(t)) {
+        LOG.warn("Couldn't add to the Resolution queue now. Will " +
+                 "try again");
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException ie) {}
+      }
+    }
+    public void run() {
+      while (!isInterrupted()) {
+        try {
+          int size;
+          if((size = queue.size()) == 0) {
+            Thread.sleep(1000);
+            continue;
+          }
+          List <TaskTrackerStatus> statuses = 
+            new ArrayList<TaskTrackerStatus>(size);
+          queue.drainTo(statuses);
+          List<String> dnHosts = new ArrayList<String>(size);
+          for (TaskTrackerStatus t : statuses) {
+            dnHosts.add(t.getHost());
+          }
+          List<String> rName = dnsToSwitchMapping.resolve(dnHosts);
+          if (rName == null) {
+            continue;
+          }
+          int i = 0;
+          for (String m : rName) {
+            String host = statuses.get(i++).getHost();
+            String networkLoc = NodeBase.normalize(m);
+            Node node = null;
+            if (clusterMap.getNode(networkLoc+"/"+host) == null) {
+              node = new NodeBase(host, networkLoc);
+              clusterMap.add(node);
+              trackerNameToNodeMap.put(host, node);
+            }
+            numResolved++;
+          }
+        } catch (InterruptedException ie) {
+          LOG.warn(getName() + " exiting, got interrupted: " + 
+                   StringUtils.stringifyException(ie));
+          return;
+        } catch (Throwable t) {
+          LOG.error(getName() + " got an exception: " +
+              StringUtils.stringifyException(t));
+        }
+      }
+      LOG.warn(getName() + " exiting...");
+    }
+  }
+  
   /**
    * Returns a task we'd like the TaskTracker to execute right now.
    *

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Feb 28 08:04:34 2008
@@ -39,6 +39,7 @@
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.net.*;
 
 /** Base class for tasks. */
 abstract class Task implements Writable, Configurable {
@@ -405,6 +406,17 @@
     }
     this.mapOutputFile.setConf(this.conf);
     this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+    // add the static resolutions (this is required for the junit to
+    // work on testcases that simulate multiple nodes on a single physical
+    // node.
+    String hostToResolved[] = conf.getStrings("hadoop.net.static.resolutions");
+    if (hostToResolved != null) {
+      for (String str : hostToResolved) {
+        String name = str.substring(0, str.indexOf('='));
+        String resolvedName = str.substring(str.indexOf('=') + 1);
+        NetUtils.addStaticResolution(name, resolvedName);
+      }
+    }
   }
 
   public Configuration getConf() {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Thu Feb 28 08:04:34 2008
@@ -31,6 +31,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.JobClient.RawSplit;
 
 
 /*************************************************************
@@ -62,8 +63,7 @@
 
   // Defines the TIP
   private String jobFile = null;
-  private String splitClass = null;
-  private BytesWritable split = null;
+  private RawSplit rawSplit;
   private int numMaps;
   private int partition;
   private JobTracker jobtracker;
@@ -115,17 +115,17 @@
   private TreeMap<String, Boolean> tasksToKill = new TreeMap<String, Boolean>();
   
   private Counters counters = new Counters();
+  
 
   /**
    * Constructor for MapTask
    */
   public TaskInProgress(String jobid, String jobFile, 
-                        String splitClass, BytesWritable split, 
+                        RawSplit rawSplit, 
                         JobTracker jobtracker, JobConf conf, 
                         JobInProgress job, int partition) {
     this.jobFile = jobFile;
-    this.splitClass = splitClass;
-    this.split = split;
+    this.rawSplit = rawSplit;
     this.jobtracker = jobtracker;
     this.job = job;
     this.conf = conf;
@@ -233,7 +233,7 @@
    * Whether this is a map task
    */
   public boolean isMapTask() {
-    return split != null;
+    return rawSplit != null;
   }
     
   /**
@@ -570,6 +570,13 @@
   }
 
   /**
+   * Get the split locations 
+   */
+  public String[] getSplitLocations() {
+    return rawSplit.getLocations();
+  }
+  
+  /**
    * Get the Status of the tasks managed by this TIP
    */
   public TaskStatus[] getTaskStatuses() {
@@ -726,7 +733,7 @@
 
     if (isMapTask()) {
       t = new MapTask(jobId, jobFile, this.id, taskid, partition, 
-                      splitClass, split);
+                      rawSplit.getClassName(), rawSplit.getBytes());
     } else {
       t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps);
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Feb 28 08:04:34 2008
@@ -389,10 +389,15 @@
   synchronized void initialize() throws IOException {
     // use configured nameserver & interface to get local hostname
     this.fConf = new JobConf(originalConf);
-    this.localHostname =
+    if (fConf.get("slave.host.name") != null) {
+      this.localHostname = fConf.get("slave.host.name");
+    }
+    if (localHostname == null) {
+      this.localHostname =
       DNS.getDefaultHost
       (fConf.get("mapred.tasktracker.dns.interface","default"),
        fConf.get("mapred.tasktracker.dns.nameserver","default"));
+    }
  
     //check local disk
     checkLocalDirs(this.fConf.getLocalDirs());
@@ -1444,6 +1449,19 @@
       }
       task.localizeConfiguration(localJobConf);
       
+      List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
+      if (staticResolutions != null && staticResolutions.size() > 0) {
+        StringBuffer str = new StringBuffer();
+
+        for (int i = 0; i < staticResolutions.size(); i++) {
+          String[] hostToResolved = staticResolutions.get(i);
+          str.append(hostToResolved[0]+"="+hostToResolved[1]);
+          if (i != staticResolutions.size() - 1) {
+            str.append(',');
+          }
+        }
+        localJobConf.set("hadoop.net.static.resolutions", str.toString());
+      }
       OutputStream out = localFs.create(localTaskFile);
       try {
         localJobConf.write(out);

Added: hadoop/core/trunk/src/java/org/apache/hadoop/net/DNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/DNSToSwitchMapping.java?rev=632035&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/DNSToSwitchMapping.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/DNSToSwitchMapping.java Thu Feb 28 08:04:34 2008
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.util.List;
+
+/**
+ * An interface that should be implemented to allow pluggable 
+ * DNS-name/IP-address to RackID resolvers.
+ *
+ */
+public interface DNSToSwitchMapping {
+  /**
+   * Resolves a list of DNS-names/IP-addresses and returns back a list of
+   * switch information (network paths). One-to-one correspondence must be 
+   * maintained between the elements in the lists. 
+   * Consider an element in the argument list - x.y.com. The switch information
+   * that is returned must be a network path of the form /foo/rack, 
+   * where / is the root, and 'foo' is the switch where 'rack' is connected.
+   * Note the hostname/ip-address is not part of the returned path.
+   * The network topology of the cluster would determine the number of
+   * components in the network path.
+   * @param names
+   * @return list of resolved network paths
+   */
+  public List<String> resolve(List<String> names);
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java Thu Feb 28 08:04:34 2008
@@ -19,6 +19,9 @@
 
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Map.Entry;
+import java.util.*;
+
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
@@ -30,6 +33,9 @@
 
 public class NetUtils {
   private static final Log LOG = LogFactory.getLog(NetUtils.class);
+  
+  private static Map<String, String> hostToResolved = 
+                                     new HashMap<String, String>();
 
   /**
    * Get the socket factory for the given class according to its
@@ -122,6 +128,9 @@
       port = addr.getPort();
     }
   
+    if (getStaticResolution(hostname) != null) {
+      hostname = getStaticResolution(hostname);
+    }
     return new InetSocketAddress(hostname, port);
   }
 
@@ -163,5 +172,56 @@
                " is deprecated. Use " + newBindAddressName + " instead.");      
     }
     return oldAddr + ":" + oldPort;
+  }
+  
+  /**
+   * Adds a static resolution for host. This can be used for setting up
+   * hostnames with names that are fake to point to a well known host. For e.g.
+   * in some testcases we require to have daemons with different hostnames
+   * running on the same machine. In order to create connections to these
+   * daemons, one can set up mappings from those hostnames to "localhost".
+   * {@link NetUtils#getStaticResolution(String)} can be used to query for
+   * the actual hostname. 
+   * @param host
+   * @param resolvedName
+   */
+  public static void addStaticResolution(String host, String resolvedName) {
+    synchronized (hostToResolved) {
+      hostToResolved.put(host, resolvedName);
+    }
+  }
+  
+  /**
+   * Retrieves the resolved name for the passed host. The resolved name must
+   * have been set earlier using 
+   * {@link NetUtils#addStaticResolution(String, String)}
+   * @param host
+   * @return the resolution
+   */
+  public static String getStaticResolution(String host) {
+    synchronized (hostToResolved) {
+      return hostToResolved.get(host);
+    }
+  }
+  
+  /**
+   * This is used to get all the resolutions that were added using
+   * {@link NetUtils#addStaticResolution(String, String)}. The return
+   * value is a List each element of which contains an array of String 
+   * of the form String[0]=hostname, String[1]=resolved-hostname
+   * @return the list of resolutions
+   */
+  public static List <String[]> getAllStaticResolutions() {
+    synchronized (hostToResolved) {
+      Set <Entry <String, String>>entries = hostToResolved.entrySet();
+      if (entries.size() == 0) {
+        return null;
+      }
+      List <String[]> l = new ArrayList<String[]>(entries.size());
+      for (Entry<String, String> e : entries) {
+        l.add(new String[] {e.getKey(), e.getValue()});
+      }
+    return l;
+    }
   }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java Thu Feb 28 08:04:34 2008
@@ -38,6 +38,8 @@
  */
 public class NetworkTopology {
   public final static String DEFAULT_RACK = "/default-rack";
+  public final static String UNRESOLVED = "";
+  public final static int DEFAULT_HOST_LEVEL = 2;
   public static final Log LOG = 
     LogFactory.getLog("org.apache.hadoop.net.NetworkTopology");
     
@@ -389,11 +391,16 @@
    *          a path-like string representation of a node
    * @return a reference to the node; null if the node is not in the tree
    */
-  private Node getNode(String loc) {
-    loc = NodeBase.normalize(loc);
-    if (!NodeBase.ROOT.equals(loc))
-      loc = loc.substring(1);
-    return clusterMap.getLoc(loc);
+  public Node getNode(String loc) {
+    netlock.readLock().lock();
+    try {
+      loc = NodeBase.normalize(loc);
+      if (!NodeBase.ROOT.equals(loc))
+        loc = loc.substring(1);
+      return clusterMap.getLoc(loc);
+    } finally {
+      netlock.readLock().unlock();
+    }
   }
     
   /** Return the total number of racks */

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/net/Node.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/Node.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/Node.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/Node.java Thu Feb 28 08:04:34 2008
@@ -30,6 +30,8 @@
 public interface Node {
   /** Return the string representation of this node's network location */
   public String getNetworkLocation();
+  /** Set the node's network location */
+  public void setNetworkLocation(String location);
   /** Return this node's name */
   public String getName();
   /** Return this node's parent */

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/net/NodeBase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/NodeBase.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/NodeBase.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/NodeBase.java Thu Feb 28 08:04:34 2008
@@ -84,6 +84,9 @@
   /** Return this node's network location */
   public String getNetworkLocation() { return location; }
   
+  /** Set this node's network location */
+  public void setNetworkLocation(String location) { this.location = location; }
+  
   /** Return this node's path */
   public static String getPath(Node node) {
     return node.getNetworkLocation()+PATH_SEPARATOR_STR+node.getName();

Added: hadoop/core/trunk/src/java/org/apache/hadoop/net/ScriptBasedMapping.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/ScriptBasedMapping.java?rev=632035&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/ScriptBasedMapping.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/ScriptBasedMapping.java Thu Feb 28 08:04:34 2008
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.net;
+
+import java.util.*;
+import java.io.*;
+import java.net.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.conf.*;
+
+/**
+ * This class implements the {@link DNSToSwitchMapping} interface using a 
+ * script configured via topology.script.file.name .
+ */
+public final class ScriptBasedMapping implements Configurable, 
+DNSToSwitchMapping
+{
+  private String scriptName;
+  private Configuration conf;
+  private int maxArgs; //max hostnames per call of the script
+  private Map<String, String> cache = new TreeMap<String, String>();
+  private static Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.net.ScriptBasedMapping");
+  public void setConf (Configuration conf) {
+    this.scriptName = conf.get("topology.script.file.name");
+    this.maxArgs = conf.getInt("topology.script.number.args", 20);
+    this.conf = conf;
+  }
+  public Configuration getConf () {
+    return conf;
+  }
+
+  public ScriptBasedMapping() {}
+  
+  public List<String> resolve(List<String> names) {
+    List <String> m = new ArrayList<String>(names.size());
+    
+    if (scriptName == null) {
+      for (int i = 0; i < names.size(); i++) {
+        m.add(NetworkTopology.DEFAULT_RACK);
+      }
+      return m;
+    }
+    List<String> hosts = new ArrayList<String>(names.size());
+    for (String name : names) {
+      name = getHostName(name);
+      if (cache.get(name) == null) {
+        hosts.add(name);
+      } 
+    }
+    
+    int i = 0;
+    String output = runResolveCommand(hosts);
+    if (output != null) {
+      StringTokenizer allSwitchInfo = new StringTokenizer(output);
+      while (allSwitchInfo.hasMoreTokens()) {
+        String switchInfo = allSwitchInfo.nextToken();
+        cache.put(hosts.get(i++), switchInfo);
+      }
+    }
+    for (String name : names) {
+      //now everything is in the cache
+      name = getHostName(name);
+      if (cache.get(name) != null) {
+        m.add(cache.get(name));
+      } else { //resolve all or nothing
+        return null;
+      }
+    }
+    return m;
+  }
+  
+  private String runResolveCommand(List<String> args) {
+    InetAddress ipaddr = null;
+    int loopCount = 0;
+    if (args.size() == 0) {
+      return null;
+    }
+    StringBuffer allOutput = new StringBuffer();
+    int numProcessed = 0;
+    while (numProcessed != args.size()) {
+      int start = maxArgs * loopCount;
+      List <String> cmdList = new ArrayList<String>();
+      cmdList.add(scriptName);
+      for (numProcessed = start; numProcessed < (start + maxArgs) && 
+           numProcessed < args.size(); numProcessed++) {
+        try {
+          ipaddr = InetAddress.getByName(args.get(numProcessed));
+        } catch (UnknownHostException uh) {
+          return null;
+        }
+        cmdList.add(ipaddr.getHostAddress()); 
+      }
+      File dir = null;
+      String userDir;
+      if ((userDir = System.getProperty("user.dir")) != null) {
+        dir = new File(userDir);
+      }
+      ShellCommandExecutor s = new ShellCommandExecutor(
+                                   cmdList.toArray(new String[0]), dir);
+      try {
+        s.execute();
+        allOutput.append(s.getOutput() + " ");
+      } catch (Exception e) {
+        LOG.warn(StringUtils.stringifyException(e));
+        return null;
+      }
+      loopCount++; 
+    }
+    return allOutput.toString();
+  }
+  private String getHostName(String hostWithPort) {
+    int j;
+    if ((j = hostWithPort.indexOf(':')) != -1) {
+      hostWithPort = hostWithPort.substring(0, j);
+    }
+    return hostWithPort;
+  }
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu Feb 28 08:04:34 2008
@@ -18,7 +18,11 @@
 package org.apache.hadoop.dfs;
 
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.BufferedOutputStream;
+import java.io.PipedOutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,6 +30,7 @@
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.*;
 import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.fs.FileSystem;
@@ -55,11 +60,11 @@
   private Configuration conf;
   private NameNode nameNode;
   private int numDataNodes;
-  private int curDatanodesNum = 0;
   private ArrayList<DataNodeProperties> dataNodes = 
                          new ArrayList<DataNodeProperties>();
   private File base_dir;
   private File data_dir;
+  private DNSToSwitchMapping dnsToSwitchMapping;
   
   
   /**
@@ -85,7 +90,7 @@
   public MiniDFSCluster(Configuration conf,
                         int numDataNodes,
                         StartupOption nameNodeOperation) throws IOException {
-    this(0, conf, numDataNodes, false, false, nameNodeOperation, null);
+    this(0, conf, numDataNodes, false, false, nameNodeOperation, null, null, null);
   }
   
   /**
@@ -105,7 +110,28 @@
                         int numDataNodes,
                         boolean format,
                         String[] racks) throws IOException {
-    this(0, conf, numDataNodes, format, true, null, racks);
+    this(0, conf, numDataNodes, format, true, null, racks, null, null);
+  }
+  
+  /**
+   * Modify the config and start up the servers.  The rpc and info ports for
+   * servers are guaranteed to use free ports.
+   * <p>
+   * NameNode and DataNode directory creation and configuration will be
+   * managed by this class.
+   *
+   * @param conf the base configuration to use in starting the servers.  This
+   *          will be modified as necessary.
+   * @param numDataNodes Number of DataNodes to start; may be zero
+   * @param format if true, format the NameNode and DataNodes before starting up
+   * @param racks array of strings indicating the rack that each DataNode is on
+   * @param hosts array of strings indicating the hostname for each DataNode
+   */
+  public MiniDFSCluster(Configuration conf,
+                        int numDataNodes,
+                        boolean format,
+                        String[] racks, String[] hosts) throws IOException {
+    this(0, conf, numDataNodes, format, true, null, racks, hosts, null);
   }
   
   /**
@@ -133,8 +159,8 @@
                         boolean manageDfsDirs,
                         StartupOption operation,
                         String[] racks) throws IOException {
-    this(0, conf, numDataNodes, format, manageDfsDirs, operation, racks, null);
- 
+    this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, operation, 
+        racks, null, null);
   }
 
   /**
@@ -164,6 +190,38 @@
                         StartupOption operation,
                         String[] racks,
                         long[] simulatedCapacities) throws IOException {
+    this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, operation, racks, null, 
+        simulatedCapacities);
+  }
+  
+  /**
+   * NOTE: if possible, the other constructors that don't have nameNode port 
+   * parameter should be used as they will ensure that the servers use free ports.
+   * <p>
+   * Modify the config and start up the servers.  
+   * 
+   * @param nameNodePort suggestion for which rpc port to use.  caller should
+   *          use getNameNodePort() to get the actual port used.
+   * @param conf the base configuration to use in starting the servers.  This
+   *          will be modified as necessary.
+   * @param numDataNodes Number of DataNodes to start; may be zero
+   * @param format if true, format the NameNode and DataNodes before starting up
+   * @param manageDfsDirs if true, the data directories for servers will be
+   *          created and dfs.name.dir and dfs.data.dir will be set in the conf
+   * @param operation the operation with which to start the servers.  If null
+   *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
+   * @param racks array of strings indicating the rack that each DataNode is on
+   * @param hosts array of strings indicating the hostnames of each DataNode
+   * @param simulatedCapacities array of capacities of the simulated data nodes
+   */
+  public MiniDFSCluster(int nameNodePort, 
+                        Configuration conf,
+                        int numDataNodes,
+                        boolean format,
+                        boolean manageDfsDirs,
+                        StartupOption operation,
+                        String[] racks, String hosts[],
+                        long[] simulatedCapacities) throws IOException {
     this.conf = conf;
     try {
       UserGroupInformation.setCurrentUGI(UnixUserGroupInformation.login(conf));
@@ -201,10 +259,12 @@
                      operation == StartupOption.FORMAT ||
                      operation == StartupOption.REGULAR) ?
       new String[] {} : new String[] {"-"+operation.toString()};
+    conf.setClass("topology.node.switch.mapping.impl", 
+                   StaticMapping.class, DNSToSwitchMapping.class);
     nameNode = NameNode.createNameNode(args, conf);
     
     // Start the DataNodes
-    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, simulatedCapacities);
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts, simulatedCapacities);
     
     if (numDataNodes > 0) {
       while (!isClusterUp()) {
@@ -235,15 +295,17 @@
    * @param operation the operation with which to start the DataNodes.  If null
    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
    * @param racks array of strings indicating the rack that each DataNode is on
+   * @param hosts array of strings indicating the hostnames for each DataNode
    * @param simulatedCapacities array of capacities of the simulated data nodes
    *
    * @throws IllegalStateException if NameNode has been shutdown
    */
   public synchronized void startDataNodes(Configuration conf, int numDataNodes, 
                              boolean manageDfsDirs, StartupOption operation, 
-                             String[] racks,
+                             String[] racks, String[] hosts,
                              long[] simulatedCapacities) throws IOException {
 
+    int curDatanodesNum = dataNodes.size();
     // for mincluster's the default initialDelay for BRs is 0
     if (conf.get("dfs.blockreport.initialDelay") == null) {
       conf.setLong("dfs.blockreport.initialDelay", 0);
@@ -262,6 +324,18 @@
       throw new IllegalArgumentException( "The length of racks [" + racks.length
           + "] is less than the number of datanodes [" + numDataNodes + "].");
     }
+    if (hosts != null && numDataNodes > hosts.length ) {
+      throw new IllegalArgumentException( "The length of hosts [" + hosts.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+    //Generate some hostnames if required
+    if (racks != null && hosts == null) {
+      System.out.println("Generating host names for datanodes");
+      hosts = new String[numDataNodes];
+      for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
+        hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
+      }
+    }
 
     if (simulatedCapacities != null 
         && numDataNodes > simulatedCapacities.length) {
@@ -271,8 +345,8 @@
     }
 
     // Set up the right ports for the datanodes
-    conf.set("dfs.datanode.address", "0.0.0.0:0");
-    conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+    conf.set("dfs.datanode.address", "127.0.0.1:0");
+    conf.set("dfs.datanode.http.address", "127.0.0.1:0");
     
     String[] args = (operation == null ||
                      operation == StartupOption.FORMAT ||
@@ -293,9 +367,6 @@
         }
         dnConf.set("dfs.data.dir", dir1.getPath() + "," + dir2.getPath()); 
       }
-      if (racks != null) {
-        dnConf.set("dfs.datanode.rack", racks[i-curDatanodesNum]);
-      }
       if (simulatedCapacities != null) {
         dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
         dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
@@ -303,13 +374,38 @@
       }
       System.out.println("Starting DataNode " + i + " with dfs.data.dir: " 
                          + dnConf.get("dfs.data.dir"));
+      if (hosts != null) {
+        dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
+        System.out.println("Starting DataNode " + i + " with hostname set to: " 
+                           + dnConf.get("slave.host.name"));
+      }
+      if (racks != null) {
+        String name = hosts[i - curDatanodesNum];
+        System.out.println("Adding node with hostname : " + name + " to rack "+
+                            racks[i-curDatanodesNum]);
+        StaticMapping.addNodeToRack(name, racks[i-curDatanodesNum]);
+      }
       Configuration newconf = new Configuration(dnConf); // save config
-      dataNodes.add(new DataNodeProperties(
-                     DataNode.createDataNode(dnArgs, dnConf), 
-                     newconf, dnArgs));
+      if (hosts != null) {
+        NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
+      }
+      DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
+      //since the HDFS does things based on IP:port, we need to add the mapping
+      //for IP:port to rackId
+      String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
+      if (racks != null) {
+        int port = dn.getSelfAddr().getPort();
+        System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+
+                            " to rack " + racks[i-curDatanodesNum]);
+        StaticMapping.addNodeToRack(ipAddr + ":" + port,
+                                  racks[i-curDatanodesNum]);
+      }
+      DataNode.runDatanodeDaemon(dn);
+      dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
     }
     curDatanodesNum += numDataNodes;
     this.numDataNodes += numDataNodes;
+    waitActive();
   }
   
   
@@ -334,10 +430,40 @@
       boolean manageDfsDirs, StartupOption operation, 
       String[] racks
       ) throws IOException {
-    startDataNodes( conf,  numDataNodes, manageDfsDirs,  operation, racks, null);
+    startDataNodes( conf,  numDataNodes, manageDfsDirs,  operation, racks, null, null);
   }
   
   /**
+   * Modify the config and start up additional DataNodes.  The info port for
+   * DataNodes is guaranteed to use a free port.
+   *  
+   *  Data nodes can run with the name node in the mini cluster or
+   *  a real name node. For example, running with a real name node is useful
+   *  when running simulated data nodes with a real name node.
+   *  If minicluster's name node is null assume that the conf has been
+   *  set with the right address:port of the name node.
+   *
+   * @param conf the base configuration to use in starting the DataNodes.  This
+   *          will be modified as necessary.
+   * @param numDataNodes Number of DataNodes to start; may be zero
+   * @param manageDfsDirs if true, the data directories for DataNodes will be
+   *          created and dfs.data.dir will be set in the conf
+   * @param operation the operation with which to start the DataNodes.  If null
+   *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
+   * @param racks array of strings indicating the rack that each DataNode is on
+   * @param simulatedCapacities array of capacities of the simulated data nodes
+   *
+   * @throws IllegalStateException if NameNode has been shutdown
+   */
+  public void startDataNodes(Configuration conf, int numDataNodes, 
+                             boolean manageDfsDirs, StartupOption operation, 
+                             String[] racks,
+                             long[] simulatedCapacities) throws IOException {
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null,
+                   simulatedCapacities);
+    
+  }
+  /**
    * If the NameNode is running, attempt to finalize a previous upgrade.
    * When this method return, the NameNode should be finalized, but
    * DataNodes may not be since that occurs asynchronously.
@@ -512,15 +638,32 @@
     InetSocketAddress addr = new InetSocketAddress("localhost",
                                                    getNameNodePort());
     DFSClient client = new DFSClient(addr, conf);
+    DatanodeInfo[] dnInfos;
 
     // make sure all datanodes are alive
-    while( client.datanodeReport(DatanodeReportType.LIVE).length
+    while((dnInfos = client.datanodeReport(DatanodeReportType.LIVE)).length
         != numDataNodes) {
       try {
         Thread.sleep(500);
       } catch (Exception e) {
       }
     }
+    int numResolved = 0;
+    do {
+      numResolved = 0;
+      for (DatanodeInfo info : dnInfos) {
+        if (!info.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
+          numResolved++;
+        } else {
+          try {
+            Thread.sleep(500);
+          } catch (Exception e) {
+          }
+          dnInfos = client.datanodeReport(DatanodeReportType.LIVE);
+          break;
+        }
+      }
+    } while (numResolved != numDataNodes);
 
     client.close();
   }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java Thu Feb 28 08:04:34 2008
@@ -60,7 +60,7 @@
  * Then the benchmark executes the specified number of operations using 
  * the specified number of threads and outputs the resulting stats.
  */
-public class NNThroughputBenchmark {
+public class NNThroughputBenchmark implements FSConstants {
   private static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NNThroughputBenchmark");
   private static final int BLOCK_SIZE = 16;
 
@@ -568,10 +568,8 @@
       NamespaceInfo nsInfo = nameNode.versionRequest();
       dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
       DataNode.setNewStorageID(dnRegistration);
-      // get network location
-      String networkLoc = NetworkTopology.DEFAULT_RACK;
       // register datanode
-      dnRegistration = nameNode.register(dnRegistration, networkLoc);
+      dnRegistration = nameNode.register(dnRegistration);
     }
 
     void sendHeartbeat() throws IOException {
@@ -677,6 +675,24 @@
         datanodes[idx].sendHeartbeat();
         prevDNName = datanodes[idx].dnRegistration.getName();
       }
+      int numResolved = 0;
+      DatanodeInfo[] dnInfos = nameNode.getDatanodeReport(DatanodeReportType.ALL);
+      do {
+        numResolved = 0;
+        for (DatanodeInfo info : dnInfos) {
+          if (!info.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
+            numResolved++;
+          } else {
+            try {
+              Thread.sleep(2);
+            } catch (Exception e) {
+            }
+            dnInfos = nameNode.getDatanodeReport(DatanodeReportType.LIVE);
+            break;
+          }
+        }
+      } while (numResolved != nrDatanodes);
+
       // create files 
       FileGenerator nameGenerator;
       nameGenerator = new FileGenerator(getBaseDir(), 100);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java Thu Feb 28 08:04:34 2008
@@ -65,7 +65,7 @@
    */
   private static long waitForVerification(DatanodeInfo dn, FileSystem fs, 
                                           Path file) throws IOException {
-    URL url = new URL("http://" + dn.getHostName() + ":" + dn.getInfoPort() +
+    URL url = new URL("http://localhost:" + dn.getInfoPort() +
                       "/blockScannerReport?listblocks");
     long lastWarnTime = System.currentTimeMillis();
     long verificationTime = 0;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=632035&r1=632034&r2=632035&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu Feb 28 08:04:34 2008
@@ -22,6 +22,9 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.net.NetUtils;
 
 /**
  * This class creates a single-process Map-Reduce cluster for junit testing.
@@ -48,7 +51,7 @@
    */
   class JobTrackerRunner implements Runnable {
     private JobTracker tracker = null;
-
+    
     JobConf jc = null;
         
     public boolean isUp() {
@@ -70,6 +73,8 @@
       try {
         jc = createJobConf();
         jc.set("mapred.local.dir","build/test/mapred/local");
+        jc.setClass("topology.node.switch.mapping.impl", 
+            StaticMapping.class, DNSToSwitchMapping.class);
         tracker = JobTracker.startTracker(jc);
         tracker.offerService();
       } catch (Throwable e) {
@@ -104,11 +109,15 @@
     volatile boolean isDead = false;
     int numDir;
 
-    TaskTrackerRunner(int trackerId, int numDir) throws IOException {
+    TaskTrackerRunner(int trackerId, int numDir, String hostname) 
+    throws IOException {
       this.trackerId = trackerId;
       this.numDir = numDir;
       localDirs = new String[numDir];
       conf = createJobConf();
+      if (hostname != null) {
+        conf.set("slave.host.name", hostname);
+      }
       conf.set("mapred.task.tracker.http.address", "0.0.0.0:0");
       conf.set("mapred.task.tracker.report.address", 
                 "127.0.0.1:" + taskTrackerPort);
@@ -132,6 +141,14 @@
       }
       conf.set("mapred.local.dir", localPath.toString());
       LOG.info("mapred.local.dir is " +  localPath);
+      try {
+        tt = new TaskTracker(conf);
+        isInitialized = true;
+      } catch (Throwable e) {
+        isDead = true;
+        tt = null;
+        LOG.error("task tracker " + trackerId + " crashed", e);
+      }
     }
         
     /**
@@ -139,9 +156,9 @@
      */
     public void run() {
       try {
-        tt = new TaskTracker(conf);
-        isInitialized = true;
-        tt.run();
+        if (tt != null) {
+          tt.run();
+        }
       } catch (Throwable e) {
         isDead = true;
         tt = null;
@@ -162,6 +179,11 @@
     public String[] getLocalDirs(){
       return localDirs;
     } 
+    
+    public TaskTracker getTaskTracker() {
+      return tt;
+    }
+    
     /**
      * Shut down the server and wait for it to finish.
      */
@@ -226,7 +248,7 @@
     result.set("fs.default.name", namenode);
     result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
     result.set("mapred.job.tracker.http.address", 
-                        "0.0.0.0:" + jobTrackerInfoPort);
+                        "127.0.0.1:" + jobTrackerInfoPort);
     // for debugging have all task output sent to the test output
     JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL);
     return result;
@@ -239,6 +261,18 @@
    * @param numDir no. of directories
    * @throws IOException
    */
+  public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, 
+      String[] racks, String[] hosts) throws IOException {
+    this(0, 0, numTaskTrackers, namenode, false, numDir, racks, hosts);
+  }
+  
+  /**
+   * Create the config and the cluster.
+   * @param numTaskTrackers no. of tasktrackers in the cluster
+   * @param namenode the namenode
+   * @param numDir no. of directories
+   * @throws IOException
+   */
   public MiniMRCluster(int numTaskTrackers, String namenode, int numDir) 
     throws IOException {
     this(0, 0, numTaskTrackers, namenode, false, numDir);
@@ -260,11 +294,49 @@
   } 
 
   public MiniMRCluster(int jobTrackerPort,
+      int taskTrackerPort,
+      int numTaskTrackers,
+      String namenode,
+      boolean taskTrackerFirst, int numDir)
+  throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
+        taskTrackerFirst, 1, null);
+  }
+  
+  public MiniMRCluster(int jobTrackerPort,
+      int taskTrackerPort,
+      int numTaskTrackers,
+      String namenode,
+      boolean taskTrackerFirst, int numDir,
+      String[] racks) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
+        taskTrackerFirst, numDir, racks, null);
+  }
+  
+  public MiniMRCluster(int jobTrackerPort,
                        int taskTrackerPort,
                        int numTaskTrackers,
                        String namenode,
-                       boolean taskTrackerFirst, int numDir) throws IOException {
+                       boolean taskTrackerFirst, int numDir,
+                       String[] racks, String[] hosts) throws IOException {
 
+    if (racks != null && racks.length < numTaskTrackers) {
+      LOG.error("Invalid number of racks specified. It should be at least " +
+          "equal to the number of tasktrackers");
+      shutdown();
+    }
+    if (hosts != null && numTaskTrackers > hosts.length ) {
+      throw new IllegalArgumentException( "The length of hosts [" + hosts.length
+          + "] is less than the number of tasktrackers [" + numTaskTrackers + "].");
+    }
+    //Generate some hostnames if required
+    if (racks != null && hosts == null) {
+      System.out.println("Generating host names for tasktrackers");
+      hosts = new String[numTaskTrackers];
+      for (int i = 0; i < numTaskTrackers; i++) {
+        hosts[i] = "host" + i + ".foo.com";
+      }
+    }
     this.jobTrackerPort = jobTrackerPort;
     this.taskTrackerPort = taskTrackerPort;
     this.jobTrackerInfoPort = 0;
@@ -290,7 +362,16 @@
 
     // Create the TaskTrackers
     for (int idx = 0; idx < numTaskTrackers; idx++) {
-      TaskTrackerRunner taskTracker = new TaskTrackerRunner(idx, numDir);
+      if (racks != null) {
+        StaticMapping.addNodeToRack(hosts[idx],racks[idx]);
+      }
+      if (hosts != null) {
+        NetUtils.addStaticResolution(hosts[idx], "localhost");
+      }
+      TaskTrackerRunner taskTracker;
+      taskTracker = new TaskTrackerRunner(idx, numDir, 
+          hosts == null ? null : hosts[idx]);
+      
       Thread taskTrackerThread = new Thread(taskTracker);
       taskTrackerList.add(taskTracker);
       taskTrackerThreadList.add(taskTrackerThread);
@@ -311,6 +392,12 @@
     }
 
     // Wait till the MR cluster stabilizes
+    while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
+      try {
+        Thread.sleep(20);
+      } catch (InterruptedException ie) {
+      }
+    }
     waitUntilIdle();
   }
     

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=632035&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Thu Feb 28 08:04:34 2008
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestRackAwareTaskPlacement extends TestCase {
+  private static final String rack1[] = new String[] {
+    "/r1"
+  };
+  private static final String hosts1[] = new String[] {
+    "host1.rack1.com"
+  };
+  private static final String rack2[] = new String[] {
+    "/r2", "/r2"
+  };
+  private static final String hosts2[] = new String[] {
+    "host1.rack2.com", "host2.rack2.com"
+  };
+  private static final String hosts3[] = new String[] {
+    "host3.rack1.com"
+  };
+  private static final String hosts4[] = new String[] {
+    "host1.rack2.com"
+  };
+  final Path inDir = new Path("/racktesting");
+  final Path outputPath = new Path("/output");
+  
+  public void testTaskPlacement() throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      final int taskTrackers = 1;
+
+      /* Start 3 datanodes, one in rack r1, and two in r2. Create three
+       * files (splits).
+       * 1) file1, just after starting the datanode on r1, with 
+       *    a repl factor of 1, and,
+       * 2) file2 & file3 after starting the other two datanodes, with a repl 
+       *    factor of 3.
+       * At the end, file1 will be present on only datanode1, and, file2 and 
+       * file3, will be present on all datanodes. 
+       */
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file1"), (short)1);
+      dfs.startDataNodes(conf, 2, true, null, rack2, hosts2, null);
+      dfs.waitActive();
+
+      writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file2"), (short)3);
+      writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file3"), (short)3);
+      
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + 
+                 (dfs.getFileSystem()).getUri().getPort(); 
+      /* Run a job with the (only)tasktracker on rack2. The rack location
+       * of the tasktracker will determine how many data/rack local maps it
+       * runs. The hostname of the tasktracker is set to same as one of the 
+       * datanodes.
+       */
+      mr = new MiniMRCluster(taskTrackers, namenode, 1, rack2, hosts4);
+      JobConf jobConf = mr.createJobConf();
+      if (fileSys.exists(outputPath)) {
+        fileSys.delete(outputPath);
+      }
+      /* The job is configured with three maps since there are three 
+       * (non-splittable) files. On rack2, there are two files and both
+       * have repl of three. The blocks for those files must therefore be
+       * present on all the datanodes, in particular, the datanodes on rack2.
+       * The third input file is pulled from rack1.
+       */
+      RunningJob job = launchJob(jobConf, 3);
+      Counters counters = job.getCounters();
+      assertEquals("Number of Data-local maps", 
+          counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS), 2);
+      assertEquals("Number of Rack-local maps", 
+          counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 0);
+      mr.waitUntilIdle();
+      
+      /* Run a job with the (only)tasktracker on rack1.
+       */
+      mr = new MiniMRCluster(taskTrackers, namenode, 1, rack1, hosts3);
+      jobConf = mr.createJobConf();
+      if (fileSys.exists(outputPath)) {
+        fileSys.delete(outputPath);
+      }
+      /* The job is configured with three maps since there are three 
+       * (non-splittable) files. On rack1, because of the way in which repl
+       * was setup while creating the files, we will have all the three files. 
+       * Thus, a tasktracker will find all inputs in this rack.
+       */
+      job = launchJob(jobConf, 3);
+      counters = job.getCounters();
+      assertEquals("Number of Rack-local maps",
+          counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 3);
+      mr.waitUntilIdle();
+      
+    } finally {
+      if (fileSys != null) { 
+        fileSys.close(); 
+      }
+      if (dfs != null) { 
+        dfs.shutdown(); 
+      }
+      if (mr != null) { 
+        mr.shutdown();
+      }
+    }
+  }
+  private void writeFile(NameNode namenode, Configuration conf, Path name, 
+      short replication) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+    SequenceFile.Writer writer = 
+      SequenceFile.createWriter(fileSys, conf, name, 
+                                BytesWritable.class, BytesWritable.class,
+                                CompressionType.NONE);
+    writer.append(new BytesWritable(), new BytesWritable());
+    writer.close();
+    fileSys.setReplication(name, replication);
+    waitForReplication(fileSys, namenode, name, replication);
+  }
+  private void waitForReplication(FileSystem fileSys, NameNode namenode, 
+      Path name, short replication) throws IOException {
+    //wait for the replication to happen
+    boolean isReplicationDone;
+    
+    do {
+      String[][] hints = fileSys.getFileCacheHints(name, 0, Long.MAX_VALUE);
+      if (hints[0].length == replication) {
+        isReplicationDone = true;
+      } else {
+        isReplicationDone = false;  
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        return;
+      }
+    } while(!isReplicationDone);
+  }
+  private RunningJob launchJob(JobConf jobConf, int numMaps) throws IOException {
+    jobConf.setJobName("TestForRackAwareness");
+    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    jobConf.setInputPath(inDir);
+    jobConf.setOutputPath(outputPath);
+    jobConf.setMapperClass(IdentityMapper.class);
+    jobConf.setReducerClass(IdentityReducer.class);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    jobConf.setNumMapTasks(numMaps);
+    jobConf.setNumReduceTasks(0);
+    jobConf.setJar("build/test/testjar/testjob.jar");
+    return JobClient.runJob(jobConf);
+  }
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/net/StaticMapping.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/net/StaticMapping.java?rev=632035&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/net/StaticMapping.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/net/StaticMapping.java Thu Feb 28 08:04:34 2008
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * Implements the {@link DNSToSwitchMapping} via static mappings. Used
+ * in testcases that simulate racks.
+ *
+ */
+public class StaticMapping extends Configured implements DNSToSwitchMapping {
+  public void setconf(Configuration conf) {
+    String[] mappings = conf.getStrings("hadoop.configured.node.mapping");
+    if (mappings != null) {
+      for (int i = 0; i < mappings.length; i++) {
+        String str = mappings[i];
+        String host = str.substring(0, str.indexOf('='));
+        String rack = str.substring(str.indexOf('=') + 1);
+        addNodeToRack(host, rack);
+      }
+    }
+  }
+  /* Only one instance per JVM */
+  private static Map<String, String> nameToRackMap = new HashMap<String, String>();
+  
+  static synchronized public void addNodeToRack(String name, String rackId) {
+    nameToRackMap.put(name, rackId);
+  }
+  public List<String> resolve(List<String> names) {
+    List<String> m = new ArrayList<String>();
+    synchronized (nameToRackMap) {
+      for (String name : names) {
+        String rackId;
+        if ((rackId = nameToRackMap.get(name)) != null) {
+          m.add(rackId);
+        } else {
+          m.add(NetworkTopology.DEFAULT_RACK);
+        }
+      }
+      return m;
+    }
+  }
+}



Mime
View raw message