Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 87254 invoked from network); 27 Feb 2007 22:35:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 27 Feb 2007 22:35:47 -0000 Received: (qmail 47071 invoked by uid 500); 27 Feb 2007 22:35:55 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 47036 invoked by uid 500); 27 Feb 2007 22:35:55 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 47026 invoked by uid 99); 27 Feb 2007 22:35:55 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Feb 2007 14:35:55 -0800 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Feb 2007 14:35:45 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id BFE381A981A; Tue, 27 Feb 2007 14:35:25 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r512461 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Date: Tue, 27 Feb 2007 22:35:25 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070227223525.BFE381A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Tue Feb 27 14:35:24 2007 New Revision: 512461 URL: http://svn.apache.org/viewvc?view=rev&rev=512461 Log: HADOOP-1042. Improve the handling of failed map output fetches. Contributed by Devaraj. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=512461&r1=512460&r2=512461 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Feb 27 14:35:24 2007 @@ -147,6 +147,9 @@ 43. HADOOP-1036. Fix exception handling in TaskTracker to keep tasks from being lost. (Arun C Murthy via cutting) +44. HADOOP-1042. Improve the handling of failed map output fetches. + (Devaraj Das via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=512461&r1=512460&r2=512461 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Tue Feb 27 14:35:24 2007 @@ -140,11 +140,6 @@ private int probe_sample_size = 50; /** - * a Random used during the map output fetching - */ - private Random randForProbing; - - /** * a hashmap from mapId to MapOutputLocation for retrials */ private Map retryFetches = new HashMap(); @@ -430,7 +425,8 @@ final int numOutputs = reduceTask.getNumMaps(); List neededOutputs = new ArrayList(numOutputs); - List knownOutputs = new ArrayList(100); + Map knownOutputs = + new HashMap(); int numInFlight = 0, numCopied = 0; int lowThreshold = numCopiers*2; long bytesTransferred = 0; @@ -440,7 +436,6 @@ //tweak the probe sample size (make it a function of numCopiers) probe_sample_size = Math.max(numCopiers*5, 50); - randForProbing = new Random(reduceTask.getPartition() * 100); for (int i = 0; i < numOutputs; i++) { neededOutputs.add(new Integer(i)); @@ -477,35 +472,38 @@ LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() + " map output location(s)"); try { + // Put the hash entries for the failed fetches. Entries here + // might be replaced by (mapId) hashkeys from new successful + // Map executions, if the fetch failures were due to lost tasks. + // The replacements, if at all, will happen when we query the + // JobTracker and put the mapId hashkeys with new MapOutputLocations + // as values + knownOutputs.putAll(retryFetches); // the call to queryJobTracker will modify fromEventId to a value // that it should be for the next call to queryJobTracker - MapOutputLocation[] locs = queryJobTracker(fromEventId, jobClient); + List locs = queryJobTracker(fromEventId, + jobClient); // remove discovered outputs from needed list // and put them on the known list - int gotLocs = (locs == null ? 0 : locs.length); - for (int i=0; i < locs.length; i++) { + int gotLocs = (locs == null ? 0 : locs.size()); + for (int i=0; i < locs.size(); i++) { // check whether we actually need an output. It could happen // that a map task that successfully ran earlier got lost, but // if we already have copied the output of that unfortunate task // we need not copy it again from the new TT (we will ignore // the event for the new rescheduled execution) - if(neededOutputs.remove(new Integer(locs[i].getMapId()))) { - // remove the mapId from the retryFetches hashmap since we now - // prefer the new location instead of what we saved earlier - retryFetches.remove(new Integer(locs[i].getMapId())); - knownOutputs.add(locs[i]); + if(neededOutputs.remove(new Integer(locs.get(i).getMapId()))) { + knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i)); } else gotLocs--; //we don't need this output } - // now put the remaining hash entries for the failed fetches - // and clear the hashmap - knownOutputs.addAll(retryFetches.values()); LOG.info(reduceTask.getTaskId() + " Got " + gotLocs + " new map outputs from jobtracker and " + retryFetches.size() + " map outputs from previous failures"); + // clear the "failed" fetches hashmap retryFetches.clear(); } catch (IOException ie) { @@ -523,7 +521,7 @@ " known map output location(s); scheduling..."); synchronized (scheduledCopies) { - ListIterator locIt = knownOutputs.listIterator(); + Iterator locIt = knownOutputs.values().iterator(); currentTime = System.currentTimeMillis(); while (locIt.hasNext()) { @@ -596,7 +594,7 @@ // the failure is due to a lost tasktracker (causes many // unnecessary backoffs). If not, we only take a small hit // polling the jobtracker a few more times - ListIterator locIt = knownOutputs.listIterator(); + Iterator locIt = knownOutputs.values().iterator(); while (locIt.hasNext()) { MapOutputLocation loc = (MapOutputLocation)locIt.next(); if (cr.getHost().equals(loc.getHost())) { @@ -716,7 +714,7 @@ * @return a set of locations to copy outputs from * @throws IOException */ - private MapOutputLocation[] queryJobTracker(IntWritable fromEventId, + private List queryJobTracker(IntWritable fromEventId, InterTrackerProtocol jobClient) throws IOException { @@ -747,11 +745,8 @@ mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port)); } } - Collections.shuffle(mapOutputsList, randForProbing); - MapOutputLocation[] locations = - new MapOutputLocation[mapOutputsList.size()]; fromEventId.set(fromEventId.get() + t.length); - return mapOutputsList.toArray(locations); + return mapOutputsList; }