hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r504676 - in /lucene/hadoop/branches/branch-0.11: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Wed, 07 Feb 2007 20:30:07 GMT
Author: cutting
Date: Wed Feb  7 12:30:07 2007
New Revision: 504676

URL: http://svn.apache.org/viewvc?view=rev&rev=504676
Log:
Merging revisions 502825:504674 from trunk to 0.11 branch, permitting trunk to diverge.

Modified:
    lucene/hadoop/branches/branch-0.11/CHANGES.txt
    lucene/hadoop/branches/branch-0.11/build.xml
    lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/branches/branch-0.11/src/test/org/apache/hadoop/mapred/TestMapRed.java

Modified: lucene/hadoop/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.11/CHANGES.txt?view=diff&rev=504676&r1=504675&r2=504676
==============================================================================
--- lucene/hadoop/branches/branch-0.11/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.11/CHANGES.txt Wed Feb  7 12:30:07 2007
@@ -1,6 +1,22 @@
 Hadoop Change Log
 
 
+Branch 0.11 - unreleased
+
+ 1. HADOOP-976.  Make SequenceFile.Metadata public.  (Runping Qi via cutting)
+
+ 2. HADOOP-917.  Fix a NullPointerException in SequenceFile's merger
+    with large map outputs.  (omalley via cutting)
+
+ 3. HADOOP-984.  Fix a bug in shuffle error handling introduced by
+    HADOOP-331.  If a map output is unavailable, the job tracker is
+    once more informed.  (Arun C Murthy via cutting)
+
+ 4. HADOOP-987.  Fix a problem in HDFS where blocks were not removed
+    from neededReplications after a replication target was selected.
+    (Hairong Kuang via cutting)
+
+
 Release 0.11.0 - 2007-02-02
 
  1. HADOOP-781.  Remove methods deprecated in 0.10 that are no longer

Modified: lucene/hadoop/branches/branch-0.11/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.11/build.xml?view=diff&rev=504676&r1=504675&r2=504676
==============================================================================
--- lucene/hadoop/branches/branch-0.11/build.xml (original)
+++ lucene/hadoop/branches/branch-0.11/build.xml Wed Feb  7 12:30:07 2007
@@ -9,7 +9,7 @@
  
   <property name="Name" value="Hadoop"/>
   <property name="name" value="hadoop"/>
-  <property name="version" value="0.11.1-dev"/>
+  <property name="version" value="0.11.2-dev"/>
   <property name="final.name" value="${name}-${version}"/>
   <property name="year" value="2006"/>
   <property name="libhdfs.version" value="1"/>

Modified: lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=504676&r1=504675&r2=504676
==============================================================================
--- lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed
Feb  7 12:30:07 2007
@@ -330,6 +330,14 @@
             return size;
         }
         
+        /* Check if a block is in the neededReplication queue */
+        synchronized boolean contains(Block block) {
+            for(TreeSet<Block> set:priorityQueues) {
+                if(set.contains(block)) return true;
+            }
+            return false;
+        }
+        
         /* Return the priority of a block
         * @param block a under replication block
         * @param curReplicas current number of replicas of the block
@@ -1867,7 +1875,9 @@
         
         // handle underReplication/overReplication
         short fileReplication = fileINode.getReplication();
-        neededReplications.update(block, curReplicaDelta, 0);
+        if(neededReplications.contains(block)) {
+            neededReplications.update(block, curReplicaDelta, 0);
+        }
         if (numCurrentReplica >= fileReplication ) {
             pendingReplications.remove(block);
         }        
@@ -2471,9 +2481,9 @@
                       (DatanodeDescriptor[]) replicateTargetSets.get(i);
             int numCurrentReplica = numCurrentReplicas.get(i).intValue();
             int numExpectedReplica = dir.getFileByBlock( block).getReplication(); 
-            neededReplications.update(
-                    block, numCurrentReplica, numExpectedReplica);
             if (numCurrentReplica + targets.length >= numExpectedReplica) {
+              neededReplications.remove(
+                      block, numCurrentReplica, numExpectedReplica);
               pendingReplications.add(block);
               NameNode.stateChangeLog.debug(
                 "BLOCK* NameSystem.pendingTransfer: "

Modified: lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=504676&r1=504675&r2=504676
==============================================================================
--- lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/io/SequenceFile.java Wed
Feb  7 12:30:07 2007
@@ -500,7 +500,7 @@
    * pairs of Text type.
    *
    */
-  static class Metadata implements Writable {
+  public static class Metadata implements Writable {
 
     private TreeMap<Text, Text> theMetadata;
     
@@ -1815,7 +1815,7 @@
 
       int segments = sortPass(deleteInput);
       if (segments > 1) {
-        segments = mergePass();
+        segments = mergePass(outFile.getParent());
       }
     }
 
@@ -1841,9 +1841,10 @@
 
       int segments = sortPass(deleteInput);
       if (segments > 1)
-        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"));
+        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
+                     tempDir);
       else if (segments == 1)
-        return merge(new Path[]{outFile}, true);
+        return merge(new Path[]{outFile}, true, tempDir);
       else return null;
     }
 
@@ -2078,12 +2079,14 @@
   /**
    * Merges the list of segments of type <code>SegmentDescriptor</code>
    * @param segments the list of SegmentDescriptors
+     * @param tmpDir the directory to write temporary files into
    * @return RawKeyValueIterator
    * @throws IOException
    */
-    public RawKeyValueIterator merge(List <SegmentDescriptor> segments) 
+    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
+                                     Path tmpDir) 
     throws IOException {
-      MergeQueue mQueue = new MergeQueue(segments);
+      MergeQueue mQueue = new MergeQueue(segments, tmpDir);
       return mQueue.merge();
     }
 
@@ -2093,13 +2096,16 @@
      * @param inNames the array of path names
      * @param deleteInputs true if the input files should be deleted when 
      * unnecessary
+     * @param tmpDir the directory to write temporary files into
      * @return RawKeyValueIteratorMergeQueue
      * @throws IOException
      */
-    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs) 
+    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
+                                     Path tmpDir) 
     throws IOException {
       return merge(inNames, deleteInputs, 
-                  (inNames.length < factor) ? inNames.length : factor);
+                   (inNames.length < factor) ? inNames.length : factor,
+                   tmpDir);
     }
 
     /**
@@ -2108,11 +2114,12 @@
      * @param deleteInputs true if the input files should be deleted when 
      * unnecessary
      * @param factor the factor that will be used as the maximum merge fan-in
+     * @param tmpDir the directory to write temporary files into
      * @return RawKeyValueIteratorMergeQueue
      * @throws IOException
      */
     public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
-                                     int factor) 
+                                     int factor, Path tmpDir) 
     throws IOException {
       //get the segments from inNames
       ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
@@ -2124,7 +2131,7 @@
         a.add(s);
       }
       this.factor = factor;
-      MergeQueue mQueue = new MergeQueue(a);
+      MergeQueue mQueue = new MergeQueue(a, tmpDir);
       return mQueue.merge();
     }
 
@@ -2153,7 +2160,7 @@
         a.add(s);
       }
       factor = (inNames.length < factor) ? inNames.length : factor;
-      MergeQueue mQueue = new MergeQueue(a);
+      MergeQueue mQueue = new MergeQueue(a, tempDir);
       return mQueue.merge();
     }
 
@@ -2232,9 +2239,8 @@
       if (fs.exists(outFile)) {
         throw new IOException("already exists: " + outFile);
       }
-      RawKeyValueIterator r = merge(inFiles, false);
-      Writer writer = cloneFileAttributes(fs, 
-              inFiles[0], outFile, null);
+      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
+      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
       
       writeFile(r, writer);
 
@@ -2242,12 +2248,12 @@
     }
 
     /** sort calls this to generate the final merged output */
-    private int mergePass() throws IOException {
+    private int mergePass(Path tmpDir) throws IOException {
       LOG.debug("running merge pass");
-      Writer writer = cloneFileAttributes(fs, 
+      Writer writer = cloneFileAttributes(
               outFile.suffix(".0"), outFile, null);
       RawKeyValueIterator r = merge(outFile.suffix(".0"), 
-                                    outFile.suffix(".0.index"));
+                                    outFile.suffix(".0.index"), tmpDir);
       writeFile(r, writer);
 
       writer.close();
@@ -2257,10 +2263,11 @@
     /** Used by mergePass to merge the output of the sort
      * @param inName the name of the input file containing sorted segments
      * @param indexIn the offsets of the sorted segments
+     * @param tmpDir the relative directory to store intermediate results in
      * @return RawKeyValueIterator
      * @throws IOException
      */
-    private RawKeyValueIterator merge(Path inName, Path indexIn) 
+    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
     throws IOException {
       //get the segments from indexIn
       //we create a SegmentContainer so that we can track segments belonging to
@@ -2268,7 +2275,7 @@
       //the contained segments during the merge process & hence don't need 
       //them anymore
       SegmentContainer container = new SegmentContainer(inName, indexIn);
-      MergeQueue mQueue = new MergeQueue(container.getSegmentList());
+      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir);
       return mQueue.merge();
     }
     
@@ -2282,6 +2289,7 @@
       private long totalBytesProcessed;
       private float progPerByte;
       private Progress mergeProgress = new Progress();
+      private Path tmpDir;
       
       //a TreeMap used to store the segments sorted by size (segment offset and
       //segment path name is used to break ties between segments of same sizes)
@@ -2298,11 +2306,18 @@
         super.put(stream);
       }
       
-      public MergeQueue(List <SegmentDescriptor> segments) {
+      /**
+       * A queue of file segments to merge
+       * @param segments the file segments to merge
+       * @param tmpDir a relative local directory to save intermediate files in
+       */
+      public MergeQueue(List <SegmentDescriptor> segments,
+                        Path tmpDir) {
         int size = segments.size();
         for (int i = 0; i < size; i++) {
           sortedSegmentSizes.put(segments.get(i), null);
         }
+        this.tmpDir = tmpDir;
       }
       protected boolean lessThan(Object a, Object b) {
         SegmentDescriptor msa = (SegmentDescriptor)a;
@@ -2389,9 +2404,12 @@
           } else {
             //we want to spread the creation of temp files on multiple disks if 
             //available
+            Path tmpFilename = 
+              new Path(tmpDir, "intermediate").suffix("." + passNo);
             Path outputFile = conf.getLocalPath("mapred.local.dir", 
-                                  (outFile.suffix("." + passNo)).toString());
-            Writer writer = cloneFileAttributes(fs, 
+                                                tmpFilename.toString());
+            LOG.info("writing intermediate results to " + outputFile);
+            Writer writer = cloneFileAttributes(
                             fs.makeQualified(mStream[0].segmentPathName), 
                             fs.makeQualified(outputFile), null);
             writer.sync = null; //disable sync for temp files

Modified: lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=504676&r1=504675&r2=504676
==============================================================================
--- lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/MapTask.java Wed
Feb  7 12:30:07 2007
@@ -495,7 +495,8 @@
           SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
               job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
               compressionType, codec);
-          sorter.writeFile(sorter.merge(segmentList), writer);
+          sorter.writeFile(sorter.merge(segmentList, new Path(getTaskId())), 
+                           writer);
           //add a sync block - required esp. for block compression to ensure
           //partition data don't span partition boundaries
           writer.sync();

Modified: lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=504676&r1=504675&r2=504676
==============================================================================
--- lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
(original)
+++ lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
Wed Feb  7 12:30:07 2007
@@ -600,8 +600,9 @@
               inMemFileSys.getUri());
           return numCopied == numOutputs;
         }
-        RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true, 
-                                                 inMemClosedFiles.length);
+        RawKeyValueIterator rIter = 
+          sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, 
+                       new Path(reduceTask.getTaskId()));
         //name this output file same as the name of the first file that is 
         //there in the current list of inmem files (this is guaranteed to be
         //absent on the disk currently. So we don't overwrite a prev. 
@@ -722,7 +723,7 @@
         if (inMemClosedFiles.length >= 
           (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
           RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true, 
-              inMemClosedFiles.length);
+              inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
           //name this output file same as the name of the first file that is 
           //there in the current list of inmem files (this is guaranteed to be
           //absent on the disk currently. So we don't overwrite a prev. 

Modified: lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=504676&r1=504675&r2=504676
==============================================================================
--- lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ lucene/hadoop/branches/branch-0.11/src/java/org/apache/hadoop/mapred/TaskTracker.java
Wed Feb  7 12:30:07 2007
@@ -1526,30 +1526,47 @@
         JobConf conf = (JobConf) context.getAttribute("conf");
         FileSystem fileSys = 
           (FileSystem) context.getAttribute("local.file.system");
-        //open index file
+
+        // Index file
         Path indexFileName = conf.getLocalPath(mapId+"/file.out.index");
-        FSDataInputStream in = fileSys.open(indexFileName);
-        //seek to the correct offset for the given reduce
-        in.seek(reduce * 16);
-        
-        //read the offset and length of the partition data
-        long startOffset = in.readLong();
-        long partLength = in.readLong();
-        
-        in.close();
+        FSDataInputStream indexIn = null;
          
+        // Map-output file
         Path mapOutputFileName = conf.getLocalPath(mapId+"/file.out"); 
-           
-        response.setContentLength((int) partLength);
-        FSDataInputStream inStream = null;
+        FSDataInputStream mapOutputIn = null;
+        
         // true iff IOException was caused by attempt to access input
         boolean isInputException = true;
+        
         try {
-          inStream = fileSys.open(mapOutputFileName);
-          inStream.seek(startOffset);
+          /**
+           * Read the index file to get the information about where
+           * the map-output for the given reducer is available. 
+           */
+          //open index file
+          indexIn = fileSys.open(indexFileName);
+
+          //seek to the correct offset for the given reduce
+          indexIn.seek(reduce * 16);
+          
+          //read the offset and length of the partition data
+          long startOffset = indexIn.readLong();
+          long partLength = indexIn.readLong();
+
+          //set the content-length header
+          response.setContentLength((int) partLength);
+
+          /**
+           * Read the data from the sigle map-output file and
+           * send it to the reducer.
+           */
+          //open the map-output file
+          mapOutputIn = fileSys.open(mapOutputFileName);
+          //seek to the correct offset for the reduce
+          mapOutputIn.seek(startOffset);
           try {
             int totalRead = 0;
-            int len = inStream.read(buffer, 0,
+            int len = mapOutputIn.read(buffer, 0,
                                  partLength < MAX_BYTES_TO_READ 
                                  ? (int)partLength : MAX_BYTES_TO_READ);
             while (len > 0) {
@@ -1561,12 +1578,17 @@
               }
               totalRead += len;
               if (totalRead == partLength) break;
-              len = inStream.read(buffer, 0, 
+              len = mapOutputIn.read(buffer, 0, 
                       (partLength - totalRead) < MAX_BYTES_TO_READ
                        ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
             }
           } finally {
-            inStream.close();
+            if (indexIn != null) {
+              indexIn.close();
+            }
+            if (mapOutputIn != null) {
+              mapOutputIn.close();
+            }
           }
         } catch (IOException ie) {
           TaskTracker tracker = 

Modified: lucene/hadoop/branches/branch-0.11/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.11/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=504676&r1=504675&r2=504676
==============================================================================
--- lucene/hadoop/branches/branch-0.11/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/branches/branch-0.11/src/test/org/apache/hadoop/mapred/TestMapRed.java Wed
Feb  7 12:30:07 2007
@@ -559,4 +559,58 @@
         counts = Integer.parseInt(argv[i++]);
 	      launch();
     }
+    
+    public void testSmallInput(){
+      runJob(100);
+    }
+
+    public void testBiggerInput(){
+      runJob(1000);
+    }
+
+    public void runJob(int items) {
+      try {
+        JobConf conf = new JobConf(TestMapRed.class);
+        Path testdir = new Path("build/test/test.mapred.spill");
+        Path inDir = new Path(testdir, "in");
+        Path outDir = new Path(testdir, "out");
+        FileSystem fs = FileSystem.get(conf);
+        fs.delete(testdir);
+        conf.setInt("io.sort.mb", 1);
+        conf.setInputFormat(SequenceFileInputFormat.class);
+        conf.setInputPath(inDir);
+        conf.setOutputPath(outDir);
+        conf.setMapperClass(IdentityMapper.class);
+        conf.setReducerClass(IdentityReducer.class);
+        conf.setOutputKeyClass(Text.class);
+        conf.setOutputValueClass(Text.class);
+        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        if (!fs.mkdirs(testdir)) {
+          throw new IOException("Mkdirs failed to create " + testdir.toString());
+        }
+        if (!fs.mkdirs(inDir)) {
+          throw new IOException("Mkdirs failed to create " + inDir.toString());
+        }
+        Path inFile = new Path(inDir, "part0");
+        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile,
+            Text.class, Text.class);
+
+        StringBuffer content = new StringBuffer();
+
+        for (int i = 0; i < 1000; i++) {
+          content.append(i).append(": This is one more line of content\n");
+        }
+
+        Text text = new Text(content.toString());
+
+        for (int i = 0; i < items; i++) {
+          writer.append(new Text("rec:" + i), text);
+        }
+        writer.close();
+
+        JobClient.runJob(conf);
+      } catch (Exception e) {
+        fail("Threw exception:" + e);
+      }
+    }
 }



Mime
View raw message