hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r663738 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTask.java
Date Thu, 05 Jun 2008 21:10:19 GMT
Author: acmurthy
Date: Thu Jun  5 14:10:19 2008
New Revision: 663738

URL: http://svn.apache.org/viewvc?rev=663738&view=rev
Log:
HADOOP-3326. Cleanup the local-fs and in-memory merge in the ReduceTask by spawing only one
thread each for the on-disk and in-memory merge. Contributed by Sharad Agarwal.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663738&r1=663737&r2=663738&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun  5 14:10:19 2008
@@ -466,6 +466,10 @@
     HADOOP-2565. Remove DFSPath cache of FileStatus. 
     (Tsz Wo (Nicholas), SZE via hairong)
 
+    HADOOP-3326. Cleanup the local-fs and in-memory merge in the ReduceTask by
+    spawing only one thread each for the on-disk and in-memory merge.
+    (Sharad Agarwal via acmurthy)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=663738&r1=663737&r2=663738&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jun  5 14:10:19
2008
@@ -413,14 +413,14 @@
     private volatile Throwable mergeThrowable;
     
     /** 
-     * A flag to indicate that localFS merge is in progress
+     * A flag to indicate when to exit localFS merge
      */
-    private volatile boolean localFSMergeInProgress = false;
+    private volatile boolean exitLocalFSMerge = false;
     
     /** 
-     * A flag to indicate that merge is in progress
+     * A flag to indicate when to exit InMemMerge
      */
-    private volatile boolean mergeInProgress = false;
+    private volatile boolean exitInMemMerge = false;
     
     /**
      * When we accumulate mergeThreshold number of files in ram, we merge/spill
@@ -873,27 +873,10 @@
           if (mapOutput.inMemory) {
             // Save it in the synchronized list of map-outputs
             mapOutputsFilesInMemory.add(mapOutput);
-              
-            //Create a thread to do merges. Synchronize access/update to 
-            //mergeInProgress
-            if (!mergeInProgress && 
-                ((ramManager.getPercentUsed() >= MAX_INMEM_FILESYS_USE  && 
-                  ramManager.getReservedFiles() >= 
-                    (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) || 
-                (mergeThreshold > 0 && 
-                 ramManager.getReservedFiles() >= mergeThreshold)) &&
-                mergeThrowable == null) {
-              LOG.info(reduceId + " RamManager " + 
-                       " is " + ramManager.getPercentUsed() + " full with " + 
-                       mapOutputsFilesInMemory.size() + " files." +
-                       " Triggering merge");
-
-              InMemFSMergeThread m = 
-                new InMemFSMergeThread((LocalFileSystem)localFileSys);
-              m.setName("Thread for merging in-memory files");
-              m.setDaemon(true);
-              mergeInProgress = true;
-              m.start();
+            
+            //notify the InMemFSMergeThread
+            synchronized(ramManager){
+              ramManager.notify();
             }
           } else {
             // Rename the temporary file to the final file; 
@@ -908,7 +891,7 @@
             }
 
             synchronized (mapOutputFilesOnDisk) {        
-              mapOutputFilesOnDisk.add(localFileSys.getFileStatus(filename));
+              addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
             }
           }
 
@@ -1188,6 +1171,8 @@
       DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
       final Progress copyPhase = 
         reduceTask.getProgress().phase();
+      LocalFSMerger localFSMergerThread = null;
+      InMemFSMergeThread inMemFSMergeThread = null;
       
       for (int i = 0; i < numOutputs; i++) {
         copyPhase.addPhase();       // add sub-phase per file
@@ -1204,6 +1189,13 @@
         copier.start();
       }
       
+      //start the on-disk-merge thread
+      localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
+      //start the in memory merger thread
+      inMemFSMergeThread = new InMemFSMergeThread();
+      localFSMergerThread.start();
+      inMemFSMergeThread.start();
+      
       // start the clock for bandwidth measurement
       long startTime = System.currentTimeMillis();
       long currentTime = startTime;
@@ -1329,25 +1321,6 @@
             }
           }
           
-          // Check if a on-disk merge can be done. This will help if there
-          // are no copies to be fetched but sufficient copies to be merged.
-          synchronized (mapOutputFilesOnDisk) {
-            if (!localFSMergeInProgress
-                && (mapOutputFilesOnDisk.size() >= (2 * ioSortFactor - 1))) {
-              // make sure that only one thread merges the disk files
-              localFSMergeInProgress = true;
-              // start the on-disk-merge process
-              LOG.info(reduceTask.getTaskID() + "We have  " + 
-                       mapOutputFilesOnDisk.size() + " map outputs on disk. " +
-                       "Triggering merge of " + ioSortFactor + " files");
-              LocalFSMerger lfsm =  
-                new LocalFSMerger((LocalFileSystem)localFileSys);
-              lfsm.setName("Thread for merging on-disk files");
-              lfsm.setDaemon(true);
-              lfsm.start();
-            }
-          }
-          
           // if we have no copies in flight and we can't schedule anything
           // new, just wait for a bit
           try {
@@ -1519,83 +1492,25 @@
           }
         }
         
+        // copiers are done, exit and notify the waiting merge threads
+        synchronized (mapOutputFilesOnDisk) {
+          exitLocalFSMerge = true;
+          mapOutputFilesOnDisk.notify();
+        }
+        synchronized (ramManager) {
+          exitInMemMerge = true;
+          ramManager.notify();
+        }
+        
         //Do a merge of in-memory files (if there are any)
         if (mergeThrowable == null) {
           try {
             // Wait for the on-disk merge to complete
-            while (localFSMergeInProgress) {
-              Thread.sleep(200);
-            }
+            localFSMergerThread.join();
             
             //wait for an ongoing merge (if it is in flight) to complete
-            while (mergeInProgress) {
-              Thread.sleep(200);
-            }
-            LOG.info(reduceTask.getTaskID() + 
-                     " Copying of all map outputs complete. " + 
-                     "Initiating the last merge on the remaining files " +
-                     "in-memory");
-            if (mergeThrowable != null) {
-              //this could happen if the merge that
-              //was in progress threw an exception
-              throw mergeThrowable;
-            }
-            //initiate merge
-            if (mapOutputsFilesInMemory.size() == 0) {
-              LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
-              		     "in-memory map-outputs");
-              return (copiedMapOutputs.size() == numOutputs);
-            }
-            //name this output file same as the name of the first file that is 
-            //there in the current list of inmem files (this is guaranteed to be
-            //absent on the disk currently. So we don't overwrite a prev. 
-            //created spill). Also we need to create the output file now since
-            //it is not guaranteed that this file will be present after merge
-            //is called (we delete empty map-output files as soon as we see them
-            //in the merge method)
-            TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
-            Path outputPath = 
-              localFileSys.makeQualified(
-                  mapOutputFile.getInputFileForWrite(mapId, 
-                                                     reduceTask.getTaskID(), 
-                                                     ramfsMergeOutputSize));
-            Writer writer = 
-              new Writer(conf, localFileSys, outputPath,
-                         conf.getMapOutputKeyClass(), 
-                         conf.getMapOutputValueClass(),
-                         codec);
-            List<Segment<K, V>> inMemorySegments = createInMemorySegments();
-            int noInMemSegments = inMemorySegments.size();
-            RawKeyValueIterator rIter = null;
-            try {
-              rIter = Merger.merge(conf, localFileSys,
-                                   (Class<K>)conf.getMapOutputKeyClass(), 
-                                   (Class<V>)conf.getMapOutputValueClass(),
-                                   inMemorySegments, inMemorySegments.size(), 
-                                   new Path(reduceTask.getTaskID().toString()), 
-                                   conf.getOutputKeyComparator(), reporter); 
-
-              Merger.writeFile(rIter, writer, reporter);
-              writer.close();
-            } catch (Exception e) { 
-              //make sure that we delete the ondisk file that we created earlier
-              //when we invoked cloneFileAttributes
-              writer.close();
-              localFileSys.delete(outputPath, true);
-              throw new IOException (StringUtils.stringifyException(e));
-            }
-            LOG.info(reduceTask.getTaskID() +
-                     " Merge of the " + noInMemSegments +
-                     " files in InMemoryFileSystem complete." +
-                     " Local file is " + outputPath +
-                     " of size " + 
-                     localFileSys.getFileStatus(outputPath).getLen());
-            
-            FileStatus status = localFileSys.getFileStatus(outputPath);
-            synchronized (mapOutputFilesOnDisk) {
-              mapOutputFilesOnDisk.add(status);
-            }
-          } catch (Throwable t) {
+            inMemFSMergeThread.join();
+            } catch (Throwable t) {
             LOG.warn(reduceTask.getTaskID() +
                      " Final merge of the inmemory files threw an exception: " + 
                      StringUtils.stringifyException(t));
@@ -1642,6 +1557,13 @@
       }    
     }
     
+    private void addToMapOutputFilesOnDisk(FileStatus status) {
+      synchronized (mapOutputFilesOnDisk) {
+        mapOutputFilesOnDisk.add(status);
+        mapOutputFilesOnDisk.notify();
+      }
+    }
+    
     /** 
      * Queries the {@link TaskTracker} for a set of map-completion events from
      * a given event ID.
@@ -1734,77 +1656,93 @@
 
       public LocalFSMerger(LocalFileSystem fs) {
         this.localFileSys = fs;
+        setName("Thread for merging on-disk files");
+        setDaemon(true);
       }
 
       @SuppressWarnings("unchecked")
       public void run() {
         try {
-          List<Path> mapFiles = new ArrayList<Path>();
-          long approxOutputSize = 0;
-          int bytesPerSum = 
-            reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
-          LOG.info(reduceTask.getTaskID() 
-                   + " Merging map output files on disk");
-          // 1. Prepare the list of files to be merged. This list is prepared
-          // using a list of map output files on disk. Currently we merge
-          // io.sort.factor files into 1.
-          synchronized (mapOutputFilesOnDisk) {
-            for (int i = 0; i < ioSortFactor; ++i) {
-              FileStatus filestatus = mapOutputFilesOnDisk.first();
-              mapOutputFilesOnDisk.remove(filestatus);
-              mapFiles.add(filestatus.getPath());
-              approxOutputSize += filestatus.getLen();
+          LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
+          while(!exitLocalFSMerge){
+            synchronized (mapOutputFilesOnDisk) {
+              while (!exitLocalFSMerge &&
+                  mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
+                LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
+                mapOutputFilesOnDisk.wait();
+              }
             }
-          }
-          
-          // sanity check
-          if (mapFiles.size() == 0) {
-              return;
-          }
-          
-          // add the checksum length
-          approxOutputSize += ChecksumFileSystem
-                              .getChecksumLength(approxOutputSize,
-                                                 bytesPerSum);
-
-          // 2. Start the on-disk merge process
-          Path outputPath = 
-            lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(), 
-                                           approxOutputSize, conf)
-            .suffix(".merged");
-          Writer writer = 
-            new Writer(conf, localFileSys, outputPath, 
-                       conf.getMapOutputKeyClass(), 
-                       conf.getMapOutputValueClass(),
-                       codec);
-          RawKeyValueIterator iter  = null;
-          Path tmpDir = new Path(reduceTask.getTaskID().toString());
-          final Reporter reporter = getReporter(umbilical);
-          try {
-            iter = Merger.merge(conf, localFileSys,
-                                conf.getMapOutputKeyClass(),
-                                conf.getMapOutputValueClass(),
-                                codec, mapFiles.toArray(new Path[mapFiles.size()]), 
-                                true, ioSortFactor, tmpDir, 
-                                conf.getOutputKeyComparator(), reporter);
-          } catch (Exception e) {
+            if(exitLocalFSMerge) {//to avoid running one extra time in the end
+              break;
+            }
+            List<Path> mapFiles = new ArrayList<Path>();
+            long approxOutputSize = 0;
+            int bytesPerSum = 
+              reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
+            LOG.info(reduceTask.getTaskID() + "We have  " + 
+                mapOutputFilesOnDisk.size() + " map outputs on disk. " +
+                "Triggering merge of " + ioSortFactor + " files");
+            // 1. Prepare the list of files to be merged. This list is prepared
+            // using a list of map output files on disk. Currently we merge
+            // io.sort.factor files into 1.
+            synchronized (mapOutputFilesOnDisk) {
+              for (int i = 0; i < ioSortFactor; ++i) {
+                FileStatus filestatus = mapOutputFilesOnDisk.first();
+                mapOutputFilesOnDisk.remove(filestatus);
+                mapFiles.add(filestatus.getPath());
+                approxOutputSize += filestatus.getLen();
+              }
+            }
+            
+            // sanity check
+            if (mapFiles.size() == 0) {
+                return;
+            }
+            
+            // add the checksum length
+            approxOutputSize += ChecksumFileSystem
+                                .getChecksumLength(approxOutputSize,
+                                                   bytesPerSum);
+  
+            // 2. Start the on-disk merge process
+            Path outputPath = 
+              lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(), 
+                                             approxOutputSize, conf)
+              .suffix(".merged");
+            Writer writer = 
+              new Writer(conf, localFileSys, outputPath, 
+                         conf.getMapOutputKeyClass(), 
+                         conf.getMapOutputValueClass(),
+                         codec);
+            RawKeyValueIterator iter  = null;
+            Path tmpDir = new Path(reduceTask.getTaskID().toString());
+            final Reporter reporter = getReporter(umbilical);
+            try {
+              iter = Merger.merge(conf, localFileSys,
+                                  conf.getMapOutputKeyClass(),
+                                  conf.getMapOutputValueClass(),
+                                  codec, mapFiles.toArray(new Path[mapFiles.size()]), 
+                                  true, ioSortFactor, tmpDir, 
+                                  conf.getOutputKeyComparator(), reporter);
+            } catch (Exception e) {
+              writer.close();
+              localFileSys.delete(outputPath, true);
+              throw new IOException (StringUtils.stringifyException(e));
+            }
+            Merger.writeFile(iter, writer, reporter);
             writer.close();
-            localFileSys.delete(outputPath, true);
-            throw new IOException (StringUtils.stringifyException(e));
-          }
-          Merger.writeFile(iter, writer, reporter);
-          writer.close();
-          
-          synchronized (mapOutputFilesOnDisk) {
-            mapOutputFilesOnDisk.add(localFileSys.getFileStatus(outputPath));
-          }
-          
-          LOG.info(reduceTask.getTaskID() +
-                   " Finished merging " + mapFiles.size() + 
-                   " map output files on disk of total-size " + 
-                   approxOutputSize + "." + 
-                   " Local output file is " + outputPath + " of size " +
-                   localFileSys.getFileStatus(outputPath).getLen());
+            
+            synchronized (mapOutputFilesOnDisk) {
+              addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
+            }
+            
+            LOG.info(reduceTask.getTaskID() +
+                     " Finished merging " + mapFiles.size() + 
+                     " map output files on disk of total-size " + 
+                     approxOutputSize + "." + 
+                     " Local output file is " + outputPath + " of size " +
+                     localFileSys.getFileStatus(outputPath).getLen());
+            }
         } catch (Throwable t) {
           LOG.warn(reduceTask.getTaskID()
                    + " Merging of the local FS files threw an exception: "
@@ -1812,94 +1750,126 @@
           if (mergeThrowable == null) {
             mergeThrowable = t;
           }
-        } finally {
-          localFSMergeInProgress = false;
-        }
+        } 
       }
     }
 
     private class InMemFSMergeThread extends Thread {
-      private LocalFileSystem localFileSys;
       
-      public InMemFSMergeThread( LocalFileSystem localFileSys) {
-        this.localFileSys = localFileSys;
+      public InMemFSMergeThread() {
+        setName("Thread for merging in memory files");
+        setDaemon(true);
       }
       
       @SuppressWarnings("unchecked")
       public void run() {
         LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
         try {
-          if (mapOutputsFilesInMemory.size() >= 
-                 (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
-            //name this output file same as the name of the first file that is 
-            //there in the current list of inmem files (this is guaranteed to
-            //be absent on the disk currently. So we don't overwrite a prev. 
-            //created spill). Also we need to create the output file now since
-            //it is not guaranteed that this file will be present after merge
-            //is called (we delete empty files as soon as we see them
-            //in the merge method)
-
-            //figure out the mapId 
-            TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
-            
-            Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
-                              reduceTask.getTaskID(), ramfsMergeOutputSize);
-
-            Writer writer = 
-              new Writer(conf, localFileSys, outputPath,
-                         conf.getMapOutputKeyClass(),
-                         conf.getMapOutputValueClass(),
-                         codec);
-            
-            List<Segment<K, V>> inMemorySegments = createInMemorySegments();
-            int noInMemorySegments = inMemorySegments.size();
-            
-            RawKeyValueIterator rIter = null;
-            final Reporter reporter = getReporter(umbilical);
-            try {
-              rIter = Merger.merge(conf, localFileSys,
-                                   (Class<K>)conf.getMapOutputKeyClass(),
-                                   (Class<V>)conf.getMapOutputValueClass(),
-                                   inMemorySegments, inMemorySegments.size(),
-                                   new Path(reduceTask.getTaskID().toString()),
-                                   conf.getOutputKeyComparator(), reporter);
-              if (null == combinerClass) {
-                Merger.writeFile(rIter, writer, reporter);
-              } else {
-                combineCollector.setWriter(writer);
-                combineAndSpill(rIter, reduceCombineInputCounter);
+          while(!exitInMemMerge) {
+            synchronized(ramManager) {
+              while(!exitInMemMerge &&
+                  ((ramManager.getPercentUsed() < MAX_INMEM_FILESYS_USE ||
+                  ramManager.getReservedFiles() < 
+                    (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) 
+                    &&
+                  (mergeThreshold <= 0 || 
+                      ramManager.getReservedFiles() < mergeThreshold))) {
+                LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
+                ramManager.wait();
               }
-            } catch (Exception e) { 
-              //make sure that we delete the ondisk file that we created 
-              //earlier when we invoked cloneFileAttributes
-              writer.close();
-              localFileSys.delete(outputPath, true);
-              throw (IOException)new IOException
-                      ("Intermedate merge failed").initCause(e);
             }
-            writer.close();
-            LOG.info(reduceTask.getTaskID() + 
-                     " Merge of the " + noInMemorySegments +
-                     " files in-memory complete." +
-                     " Local file is " + outputPath + " of size " + 
-                     localFileSys.getFileStatus(outputPath).getLen());
-            
-            FileStatus status = localFileSys.getFileStatus(outputPath);
-            synchronized (mapOutputFilesOnDisk) {
-              mapOutputFilesOnDisk.add(status);
+            if(exitInMemMerge) {//to avoid running one extra time in the end
+              break;
             }
+            LOG.info(reduceTask.getTaskID() + " RamManager " + 
+                " is " + ramManager.getPercentUsed() + " full with " + 
+                mapOutputsFilesInMemory.size() + " files." +
+                " Triggering merge");
+            if (mapOutputsFilesInMemory.size() >= 
+                   (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
+              doInMemMerge();
+            }
+            else {
+              LOG.info(reduceTask.getTaskID() + " Nothing to merge from map-outputs in-memory");
+            }
+          }
+          //see if any remaining files are there for merge
+          LOG.info(reduceTask.getTaskID() + 
+              " Copying of all map outputs complete. " + 
+              "Initiating the last merge on the remaining files " +
+              "in-memory");
+          if (mapOutputsFilesInMemory.size() == 0) {
+            LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
+                     "in-memory map-outputs");
+            return;
           }
-          else {
-            LOG.info(reduceTask.getTaskID() + " Nothing to merge from map-outputs in-memory");
-          }
+          doInMemMerge();
         } catch (Throwable t) {
           LOG.warn(reduceTask.getTaskID() +
-                   " Intermediate Merge of the inmemory files threw an exception: "
+                   " Merge of the inmemory files threw an exception: "
                    + StringUtils.stringifyException(t));
           ReduceCopier.this.mergeThrowable = t;
         }
-        finally {
-          mergeInProgress = false;
+      }
+      
+      @SuppressWarnings("unchecked")
+      private void doInMemMerge() throws IOException{
+        //name this output file same as the name of the first file that is 
+        //there in the current list of inmem files (this is guaranteed to
+        //be absent on the disk currently. So we don't overwrite a prev. 
+        //created spill). Also we need to create the output file now since
+        //it is not guaranteed that this file will be present after merge
+        //is called (we delete empty files as soon as we see them
+        //in the merge method)
+
+        //figure out the mapId 
+        TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
+        
+        Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
+                          reduceTask.getTaskID(), ramfsMergeOutputSize);
+
+        Writer writer = 
+          new Writer(conf, localFileSys, outputPath,
+                     conf.getMapOutputKeyClass(),
+                     conf.getMapOutputValueClass(),
+                     codec);
+        
+        List<Segment<K, V>> inMemorySegments = createInMemorySegments();
+        int noInMemorySegments = inMemorySegments.size();
+        
+        RawKeyValueIterator rIter = null;
+        final Reporter reporter = getReporter(umbilical);
+        try {
+          rIter = Merger.merge(conf, localFileSys,
+                               (Class<K>)conf.getMapOutputKeyClass(),
+                               (Class<V>)conf.getMapOutputValueClass(),
+                               inMemorySegments, inMemorySegments.size(),
+                               new Path(reduceTask.getTaskID().toString()),
+                               conf.getOutputKeyComparator(), reporter);
+          if (null == combinerClass) {
+            Merger.writeFile(rIter, writer, reporter);
+          } else {
+            combineCollector.setWriter(writer);
+            combineAndSpill(rIter, reduceCombineInputCounter);
+          }
+        } catch (Exception e) { 
+          //make sure that we delete the ondisk file that we created 
+          //earlier when we invoked cloneFileAttributes
+          writer.close();
+          localFileSys.delete(outputPath, true);
+          throw (IOException)new IOException
+                  ("Intermedate merge failed").initCause(e);
+        }
+        writer.close();
+        LOG.info(reduceTask.getTaskID() + 
+                 " Merge of the " + noInMemorySegments +
+                 " files in-memory complete." +
+                 " Local file is " + outputPath + " of size " + 
+                 localFileSys.getFileStatus(outputPath).getLen());
+        
+        FileStatus status = localFileSys.getFileStatus(outputPath);
+        synchronized (mapOutputFilesOnDisk) {
+          addToMapOutputFilesOnDisk(status);
         }
       }
     }



Mime
View raw message