Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 37585 invoked from network); 12 Jun 2008 10:48:15 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Jun 2008 10:48:15 -0000 Received: (qmail 16435 invoked by uid 500); 12 Jun 2008 10:48:17 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 16382 invoked by uid 500); 12 Jun 2008 10:48:17 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 16373 invoked by uid 99); 12 Jun 2008 10:48:17 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Jun 2008 03:48:17 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Jun 2008 10:47:35 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BD1B92388A08; Thu, 12 Jun 2008 03:47:53 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r667041 [2/2] - in /hadoop/core/branches/branch-0.18: ./ docs/ src/core/org/apache/hadoop/io/compress/ src/docs/src/documentation/content/xdocs/ src/mapred/org/apache/hadoop/mapred/ Date: Thu, 12 Jun 2008 10:47:53 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080612104753.BD1B92388A08@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=667041&r1=667040&r2=667041&view=diff ============================================================================== --- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Jun 12 03:47:52 2008 @@ -422,11 +422,6 @@ */ private volatile boolean exitLocalFSMerge = false; - /** - * A flag to indicate when to exit InMemMerge - */ - private volatile boolean exitInMemMerge = false; - /** * When we accumulate mergeThreshold number of files in ram, we merge/spill */ @@ -712,42 +707,60 @@ class ShuffleRamManager implements RamManager { /* Maximum percentage of the in-memory limit that a single shuffle can * consume*/ - private static final float MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT = 0.25f; + private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f; - private boolean closed = false; + /* Maximum percentage of shuffle-threads which can be stalled + * simultaneously after which a merge is triggered. */ + private static final float MAX_STALLED_SHUFFLE_THREADS_FRACTION = 0.75f; - volatile private int numClosed = 0; - volatile private int size = 0; private final int maxSize; private final int maxSingleShuffleLimit; + private int size = 0; + private Object dataAvailable = new Object(); - private volatile int fullSize = 0; + private int fullSize = 0; + private int numPendingRequests = 0; + private int numRequiredMapOutputs = 0; + private int numClosed = 0; + private boolean closed = false; public ShuffleRamManager(Configuration conf) { maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024; - maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT); + maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION); LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit); } - public synchronized boolean reserve(int requestedSize, InputStream in) { + public synchronized boolean reserve(int requestedSize, InputStream in) + throws InterruptedException { + // Wait till the request can be fulfilled... while ((size + requestedSize) > maxSize) { - try { - // Close the connection - if (in != null) { - try { - in.close(); - } catch (IOException ie) { - LOG.info("Failed to close connection with: " + ie); - } finally { - in = null; - } + + // Close the input... + if (in != null) { + try { + in.close(); + } catch (IOException ie) { + LOG.info("Failed to close connection with: " + ie); + } finally { + in = null; } - - // Wait for memory to free up - wait(); - } catch (InterruptedException ie) {} + } + + // Track pending requests + synchronized (dataAvailable) { + ++numPendingRequests; + dataAvailable.notify(); + } + + // Wait for memory to free up + wait(); + + // Track pending requests + synchronized (dataAvailable) { + --numPendingRequests; + } } size += requestedSize; @@ -767,20 +780,25 @@ notifyAll(); } - public void waitForDataToMerge() { + public boolean waitForDataToMerge() throws InterruptedException { + boolean done = false; synchronized (dataAvailable) { while (!closed && (getPercentUsed() < MAX_INMEM_FILESYS_USE || - getReservedFiles() < + numClosed < (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION) ) && - (mergeThreshold <= 0 || getReservedFiles() < mergeThreshold)) { - try { - dataAvailable.wait(); - } catch (InterruptedException ie) {} + (mergeThreshold <= 0 || numClosed < mergeThreshold) + && + (numPendingRequests < + numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION && + numPendingRequests < numRequiredMapOutputs)) { + dataAvailable.wait(); } + done = closed; } + return done; } public void closeInMemoryFile(int requestedSize) { @@ -791,6 +809,13 @@ } } + public void setNumCopiedMapOutputs(int numRequiredMapOutputs) { + synchronized (dataAvailable) { + this.numRequiredMapOutputs = numRequiredMapOutputs; + dataAvailable.notify(); + } + } + public void close() { synchronized (dataAvailable) { closed = true; @@ -799,14 +824,10 @@ } } - float getPercentUsed() { + private float getPercentUsed() { return (float)fullSize/maxSize; } - - int getReservedFiles() { - return numClosed; - } - + int getMemoryLimit() { return maxSize; } @@ -978,7 +999,8 @@ } // Note that we successfully copied the map-output - copiedMapOutputs.add(loc.getTaskId()); + noteCopiedMapOutput(loc.getTaskId()); + return bytes; } @@ -1004,12 +1026,22 @@ } // Note that we successfully copied the map-output - copiedMapOutputs.add(loc.getTaskId()); + noteCopiedMapOutput(loc.getTaskId()); } return bytes; } + /** + * Save the map taskid whose output we just copied. + * This function assumes that it has been synchronized on ReduceTask.this. + * + * @param taskId map taskid + */ + private void noteCopiedMapOutput(TaskID taskId) { + copiedMapOutputs.add(taskId); + ramManager.setNumCopiedMapOutputs(numMaps - copiedMapOutputs.size()); + } /** * Get the map output into a local file (either in the inmemory fs or on the @@ -1248,6 +1280,7 @@ this.shuffleClientMetrics = new ShuffleClientMetrics(conf); this.umbilical = umbilical; this.reduceTask = ReduceTask.this; + this.scheduledCopies = new ArrayList(100); this.copyResults = new ArrayList(100); this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5); @@ -1304,7 +1337,6 @@ @SuppressWarnings("unchecked") public boolean fetchOutputs() throws IOException { - final int numOutputs = reduceTask.getNumMaps(); List knownOutputs = new ArrayList(numCopiers); int totalFailures = 0; @@ -1316,7 +1348,7 @@ LocalFSMerger localFSMergerThread = null; InMemFSMergeThread inMemFSMergeThread = null; - for (int i = 0; i < numOutputs; i++) { + for (int i = 0; i < numMaps; i++) { copyPhase.addPhase(); // add sub-phase per file } @@ -1346,7 +1378,7 @@ IntWritable fromEventId = new IntWritable(0); // loop until we get all required outputs - while (copiedMapOutputs.size() < numOutputs && mergeThrowable == null) { + while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) { currentTime = System.currentTimeMillis(); boolean logNow = false; @@ -1356,7 +1388,7 @@ } if (logNow) { LOG.info(reduceTask.getTaskID() + " Need another " - + (numOutputs - copiedMapOutputs.size()) + " map output(s) " + + (numMaps - copiedMapOutputs.size()) + " map output(s) " + "where " + numInFlight + " is already in progress"); } @@ -1503,7 +1535,7 @@ float transferRate = mbs/secsSinceStart; copyPhase.startNextPhase(); - copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + copyPhase.setStatus("copy (" + numCopied + " of " + numMaps + " at " + mbpsFormat.format(transferRate) + " MB/s)"); @@ -1640,7 +1672,6 @@ mapOutputFilesOnDisk.notify(); } - exitInMemMerge = true; ramManager.close(); //Do a merge of in-memory files (if there are any) @@ -1648,9 +1679,13 @@ try { // Wait for the on-disk merge to complete localFSMergerThread.join(); + LOG.info("Interleaved on-disk merge complete: " + + mapOutputFilesOnDisk.size() + " files left."); //wait for an ongoing merge (if it is in flight) to complete inMemFSMergeThread.join(); + LOG.info("In-memory merge complete: " + + mapOutputsFilesInMemory.size() + " files left."); } catch (Throwable t) { LOG.warn(reduceTask.getTaskID() + " Final merge of the inmemory files threw an exception: " + @@ -1662,7 +1697,7 @@ return false; } } - return mergeThrowable == null && copiedMapOutputs.size() == numOutputs; + return mergeThrowable == null && copiedMapOutputs.size() == numMaps; } private List> createInMemorySegments() { @@ -1908,10 +1943,11 @@ public void run() { LOG.info(reduceTask.getTaskID() + " Thread started: " + getName()); try { - while (!exitInMemMerge) { - ramManager.waitForDataToMerge(); + boolean exit = false; + do { + exit = ramManager.waitForDataToMerge(); doInMemMerge(); - } + } while (!exit); } catch (Throwable t) { LOG.warn(reduceTask.getTaskID() + " Merge of the inmemory files threw an exception: " @@ -1923,7 +1959,6 @@ @SuppressWarnings("unchecked") private void doInMemMerge() throws IOException{ if (mapOutputsFilesInMemory.size() == 0) { - LOG.info("Noting to merge... "); return; } @@ -1953,12 +1988,16 @@ RawKeyValueIterator rIter = null; final Reporter reporter = getReporter(umbilical); try { + LOG.info("Initiating in-memory merge with " + noInMemorySegments + + " segments..."); + rIter = Merger.merge(conf, localFileSys, (Class)conf.getMapOutputKeyClass(), (Class)conf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceTask.getTaskID().toString()), conf.getOutputKeyComparator(), reporter); + if (null == combinerClass) { Merger.writeFile(rIter, writer, reporter); } else { @@ -1966,6 +2005,12 @@ combineAndSpill(rIter, reduceCombineInputCounter); } writer.close(); + + LOG.info(reduceTask.getTaskID() + + " Merge of the " + noInMemorySegments + + " files in-memory complete." + + " Local file is " + outputPath + " of size " + + localFileSys.getFileStatus(outputPath).getLen()); } catch (Exception e) { //make sure that we delete the ondisk file that we created //earlier when we invoked cloneFileAttributes @@ -1973,12 +2018,8 @@ throw (IOException)new IOException ("Intermedate merge failed").initCause(e); } - LOG.info(reduceTask.getTaskID() + - " Merge of the " + noInMemorySegments + - " files in-memory complete." + - " Local file is " + outputPath + " of size " + - localFileSys.getFileStatus(outputPath).getLen()); - + + // Note the output of the merge FileStatus status = localFileSys.getFileStatus(outputPath); synchronized (mapOutputFilesOnDisk) { addToMapOutputFilesOnDisk(status);