hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r685330 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobTracker.java src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Date Tue, 12 Aug 2008 21:46:25 GMT
Author: omalley
Date: Tue Aug 12 14:46:24 2008
New Revision: 685330

URL: http://svn.apache.org/viewvc?rev=685330&view=rev
Log:
HADOOP-3780. Remove asynchronous resolution of network topology in the 
JobTracker (Amar Kamat via omalley)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=685330&r1=685329&r2=685330&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Aug 12 14:46:24 2008
@@ -172,6 +172,9 @@
     HADOOP-3851. Fix spelling mistake in FSNamesystemMetrics. (Steve Loughran 
     via omalley)
 
+    HADOOP-3780. Remove asynchronous resolution of network topology in the 
+    JobTracker (Amar Kamat via omalley)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=685330&r1=685329&r2=685330&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Aug 12 14:46:24
2008
@@ -87,7 +87,6 @@
 
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
-  private ResolutionThread resThread = new ResolutionThread();
   private int numTaskCacheLevels; // the max level to which we cache tasks
   private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
   private final TaskScheduler taskScheduler;
@@ -688,7 +687,6 @@
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
     this.expireTrackersThread.start();
-    this.resThread.start();
     this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
     this.retireJobsThread.start();
     taskScheduler.start();
@@ -758,15 +756,6 @@
         ex.printStackTrace();
       }
     }
-    if (this.resThread != null) {
-      LOG.info("Stopping DNSToSwitchMapping Resolution thread");
-      this.resThread.interrupt();
-      try {
-        this.resThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
     if (this.completedJobsStoreThread != null &&
         this.completedJobsStoreThread.isAlive()) {
       LOG.info("Stopping completedJobsStore thread");
@@ -1211,6 +1200,12 @@
       }
     }
       
+    // Register the tracker if its not registered
+    if (getNode(trackerName) == null) {
+      // Making the network location resolution inline .. 
+      resolveAndAddToTopology(status.getHost());
+    }
+    
     // Process this heartbeat 
     short newResponseId = (short)(responseId + 1);
     if (!processHeartbeat(status, initialContact)) {
@@ -1390,7 +1385,6 @@
 
         if (initialContact) {
           trackerExpiryQueue.add(trackerStatus);
-          resThread.addToResolutionQueue(trackerStatus);
         }
       }
     }
@@ -1400,64 +1394,6 @@
     return true;
   }
 
-  private class ResolutionThread extends Thread {
-    private LinkedBlockingQueue<TaskTrackerStatus> queue = 
-      new LinkedBlockingQueue <TaskTrackerStatus>();
-    public ResolutionThread() {
-      setName("DNSToSwitchMapping reolution Thread");
-      setDaemon(true);
-    }
-    public void addToResolutionQueue(TaskTrackerStatus t) {
-      while (!queue.add(t)) {
-        LOG.warn("Couldn't add to the Resolution queue now. Will " +
-                 "try again");
-        try {
-          Thread.sleep(2000);
-        } catch (InterruptedException ie) {}
-      }
-    }
-    @Override
-    public void run() {
-      while (!isInterrupted()) {
-        try {
-          List <TaskTrackerStatus> statuses = 
-            new ArrayList<TaskTrackerStatus>(queue.size());
-          // Block if the queue is empty
-          statuses.add(queue.take());
-          queue.drainTo(statuses);
-          List<String> dnHosts = new ArrayList<String>(statuses.size());
-          for (TaskTrackerStatus t : statuses) {
-            dnHosts.add(t.getHost());
-          }
-          List<String> rName = dnsToSwitchMapping.resolve(dnHosts);
-          if (rName == null) {
-            LOG.error("The resolve call returned null! Using " + 
-                NetworkTopology.DEFAULT_RACK + " for some hosts");
-            rName = new ArrayList<String>(dnHosts.size());
-            for (int i = 0; i < dnHosts.size(); i++) {
-              rName.add(NetworkTopology.DEFAULT_RACK);
-            }
-          }
-          int i = 0;
-          for (String m : rName) {
-            String host = statuses.get(i++).getHost();
-            String networkLoc = NodeBase.normalize(m);
-            addHostToNodeMapping(host, networkLoc);
-            numResolved++;
-          }
-        } catch (InterruptedException ie) {
-          LOG.warn(getName() + " exiting, got interrupted: " + 
-                   StringUtils.stringifyException(ie));
-          return;
-        } catch (Throwable t) {
-          LOG.error(getName() + " got an exception: " +
-              StringUtils.stringifyException(t));
-        }
-      }
-      LOG.warn(getName() + " exiting...");
-    }
-  }
-
   /**
    * A tracker wants to know if any of its Tasks have been
    * closed (because the job completed, whether successfully or not)

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=685330&r1=685329&r2=685330&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Aug 12 14:46:24
2008
@@ -426,13 +426,6 @@
       taskTrackerThread.start();
     }
 
-    // Wait till the MR cluster stabilizes
-    while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
-      try {
-        Thread.sleep(20);
-      } catch (InterruptedException ie) {
-      }
-    }
     waitUntilIdle();
   }
     



Mime
View raw message