hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1438286 - in /hadoop/common/branches/branch-1: CHANGES.txt src/mapred/org/apache/hadoop/mapred/Merger.java src/mapred/org/apache/hadoop/mapred/ReduceTask.java src/test/org/apache/hadoop/mapred/TestMerger.java
Date Fri, 25 Jan 2013 01:05:13 GMT
Author: tucu
Date: Fri Jan 25 01:05:12 2013
New Revision: 1438286

URL: http://svn.apache.org/viewvc?rev=1438286&view=rev
Log:
MAPREDUCE-2264. Job status exceeds 100% in some cases. (devaraj.k and sandyr via tucu)

Added:
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestMerger.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Merger.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1438286&r1=1438285&r2=1438286&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Jan 25 01:05:12 2013
@@ -445,6 +445,9 @@ Release 1.2.0 - unreleased
     MAPREDUCE-4929. mapreduce.task.timeout is ignored.
     (Sandy Ryza via tomwhite)
 
+    MAPREDUCE-2264. Job status exceeds 100% in some cases. 
+    (devaraj.k & sandyr via tucu)
+
 Release 1.1.2 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=1438286&r1=1438285&r2=1438286&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Merger.java Fri Jan
25 01:05:12 2013
@@ -174,6 +174,7 @@ class Merger {  
     CompressionCodec codec = null;
     long segmentOffset = 0;
     long segmentLength = -1;
+    long rawDataLength = -1;
     
     public Segment(Configuration conf, FileSystem fs, Path file,
                    CompressionCodec codec, boolean preserve) throws IOException {
@@ -181,6 +182,12 @@ class Merger {  
     }
 
     public Segment(Configuration conf, FileSystem fs, Path file,
+            CompressionCodec codec, boolean preserve, long rawDataLength) throws IOException
{
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+      this.rawDataLength = rawDataLength;
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
         long segmentOffset, long segmentLength, CompressionCodec codec,
         boolean preserve) throws IOException {
       this.conf = conf;
@@ -200,6 +207,14 @@ class Merger {  
       this.segmentLength = reader.getLength();
     }
 
+    public Segment(Reader<K, V> reader, boolean preserve, long rawDataLength) {
+      this.reader = reader;
+      this.preserve = preserve;
+
+      this.segmentLength = reader.getLength();
+      this.rawDataLength = rawDataLength;
+    }
+
     private void init(Counters.Counter readsCounter) throws IOException {
       if (reader == null) {
         FSDataInputStream in = fs.open(file);
@@ -216,6 +231,9 @@ class Merger {  
         segmentLength : reader.getLength();
     }
     
+    long getRawDataLength() {
+      return (rawDataLength > 0) ? rawDataLength : getLength();
+    }
     boolean next() throws IOException {
       return reader.next(key, value);
     }
@@ -460,7 +478,7 @@ class Merger {  
           //calculating the merge progress
           long totalBytes = 0;
           for (int i = 0; i < segmentsToMerge.size(); i++) {
-            totalBytes += segmentsToMerge.get(i).getLength();
+            totalBytes += segmentsToMerge.get(i).getRawDataLength();
           }
           if (totalBytes != 0) //being paranoid
             progPerByte = 1.0f / (float)totalBytes;

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1438286&r1=1438285&r2=1438286&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri
Jan 25 01:05:12 2013
@@ -150,8 +150,8 @@ class ReduceTask extends Task {
   };
   
   // A sorted set for keeping a set of map output files on disk
-  private final SortedSet<FileStatus> mapOutputFilesOnDisk = 
-    new TreeSet<FileStatus>(mapOutputFileComparator);
+  private final SortedSet<CompressAwareFileStatus> mapOutputFilesOnDisk = 
+    new TreeSet<CompressAwareFileStatus>(mapOutputFileComparator);
 
   public ReduceTask() {
     super();
@@ -1044,6 +1044,7 @@ class ReduceTask extends Task {
       byte[] data;
       final boolean inMemory;
       long compressedSize;
+      long decompressedSize;
       
       public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, 
                        Configuration conf, Path file, long size) {
@@ -1445,7 +1446,10 @@ class ReduceTask extends Task {
             }
 
             synchronized (mapOutputFilesOnDisk) {        
-              addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
+              FileStatus fileStatus = localFileSys.getFileStatus(filename);
+              CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
+                  fileStatus, mapOutput.decompressedSize);
+              addToMapOutputFilesOnDisk(compressedFileStatus);
             }
           }
 
@@ -1568,7 +1572,7 @@ class ReduceTask extends Task {
           mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
               compressedLength);
         }
-            
+        mapOutput.decompressedSize = decompressedLength;    
         return mapOutput;
       }
       
@@ -2469,9 +2473,13 @@ class ReduceTask extends Task {
               keyClass, valueClass, codec, null);
           try {
             Merger.writeFile(rIter, writer, reporter, job);
+            long decompressedBytesWritten = writer.decompressedBytesWritten;
             writer.close();
             writer = null;
-            addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
+            FileStatus fileStatus = fs.getFileStatus(outputPath);
+            CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
+                fileStatus, decompressedBytesWritten);
+            addToMapOutputFilesOnDisk(compressedFileStatus);
           } catch (Exception e) {
             if (null != outputPath) {
               fs.delete(outputPath, true);
@@ -2497,12 +2505,17 @@ class ReduceTask extends Task {
       // segments on disk
       List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
       long onDiskBytes = inMemToDiskBytes;
-      Path[] onDisk = getMapFiles(fs, false);
-      for (Path file : onDisk) {
-        onDiskBytes += fs.getFileStatus(file).getLen();
-        diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs));
+      long totalDecompressedBytes = inMemToDiskBytes;
+
+      for (CompressAwareFileStatus filestatus : mapOutputFilesOnDisk) {
+        long len = filestatus.getLen();
+        onDiskBytes += len;
+        diskSegments.add(new Segment<K, V>(job, fs, filestatus.getPath(),
+            codec, keepInputs, filestatus.getDecompressedSize()));
+        totalDecompressedBytes += (filestatus.getDecompressedSize() > 0) ? filestatus
+            .getDecompressedSize() : len;
       }
-      LOG.info("Merging " + onDisk.length + " files, " +
+      LOG.info("Merging " + mapOutputFilesOnDisk.size() + " files, " +
                onDiskBytes + " bytes from disk");
       Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
         public int compare(Segment<K, V> o1, Segment<K, V> o2) {
@@ -2531,7 +2544,7 @@ class ReduceTask extends Task {
           return diskMerge;
         }
         finalSegments.add(new Segment<K,V>(
-              new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+              new RawKVIteratorReader(diskMerge, onDiskBytes), true, totalDecompressedBytes));
       }
       return Merger.merge(job, fs, keyClass, valueClass,
                    finalSegments, finalSegments.size(), tmpDir,
@@ -2606,7 +2619,7 @@ class ReduceTask extends Task {
       }    
     }
     
-    private void addToMapOutputFilesOnDisk(FileStatus status) {
+    private void addToMapOutputFilesOnDisk(CompressAwareFileStatus status) {
       synchronized (mapOutputFilesOnDisk) {
         mapOutputFilesOnDisk.add(status);
         mapOutputFilesOnDisk.notify();
@@ -2684,6 +2697,7 @@ class ReduceTask extends Task {
                          codec, null);
             RawKeyValueIterator iter  = null;
             Path tmpDir = new Path(reduceTask.getTaskID().toString());
+            long decompressedBytesWritten;
             try {
               iter = Merger.merge(conf, rfs,
                                   conf.getMapOutputKeyClass(),
@@ -2694,6 +2708,7 @@ class ReduceTask extends Task {
                                   spilledRecordsCounter, null);
               
               Merger.writeFile(iter, writer, reporter, conf);
+              decompressedBytesWritten = writer.decompressedBytesWritten;
               writer.close();
             } catch (Exception e) {
               localFileSys.delete(outputPath, true);
@@ -2701,7 +2716,10 @@ class ReduceTask extends Task {
             }
             
             synchronized (mapOutputFilesOnDisk) {
-              addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
+              FileStatus fileStatus = localFileSys.getFileStatus(outputPath);
+              CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
+                  fileStatus, decompressedBytesWritten);
+              addToMapOutputFilesOnDisk(compressedFileStatus);
             }
             
             LOG.info(reduceTask.getTaskID() +
@@ -2784,7 +2802,7 @@ class ReduceTask extends Task {
                      conf.getMapOutputKeyClass(),
                      conf.getMapOutputValueClass(),
                      codec, null);
-
+        long decompressedBytesWritten;
         RawKeyValueIterator rIter = null;
         try {
           LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
@@ -2804,6 +2822,7 @@ class ReduceTask extends Task {
             combineCollector.setWriter(writer);
             combinerRunner.combine(rIter, combineCollector);
           }
+          decompressedBytesWritten = writer.decompressedBytesWritten;
           writer.close();
 
           LOG.info(reduceTask.getTaskID() + 
@@ -2821,8 +2840,10 @@ class ReduceTask extends Task {
 
         // Note the output of the merge
         FileStatus status = localFileSys.getFileStatus(outputPath);
+        CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
+            status, decompressedBytesWritten);
         synchronized (mapOutputFilesOnDisk) {
-          addToMapOutputFilesOnDisk(status);
+          addToMapOutputFilesOnDisk(compressedFileStatus);
         }
       }
     }
@@ -2976,4 +2997,18 @@ class ReduceTask extends Task {
     return Integer.numberOfTrailingZeros(hob) +
       (((hob >>> 1) & value) == 0 ? 0 : 1);
   }
+  static class CompressAwareFileStatus extends FileStatus {
+	private long decompressedSize;
+	CompressAwareFileStatus(FileStatus fileStatus, long decompressedSize) {
+	  super(fileStatus.getLen(), fileStatus.isDir(), fileStatus.getReplication(),
+				  fileStatus.getBlockSize(), fileStatus.getModificationTime(),
+				  fileStatus.getAccessTime(), fileStatus.getPermission(),
+				  fileStatus.getOwner(), fileStatus.getGroup(), fileStatus.getPath());
+	  this.decompressedSize = decompressedSize;
+	}
+
+	public long getDecompressedSize() {
+	  return decompressedSize;
+	}
+  }
 }

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestMerger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestMerger.java?rev=1438286&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestMerger.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestMerger.java Fri
Jan 25 01:05:12 2013
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestMerger {
+
+  @Test
+  public void testUncompressed() throws IOException {
+    testMergeShouldReturnProperProgress(getUncompressedSegments());
+  }
+  
+  @Test
+  public void testCompressed() throws IOException {
+    testMergeShouldReturnProperProgress(getCompressedSegments());
+  }
+  
+  @SuppressWarnings( { "deprecation", "unchecked" })
+  public void testMergeShouldReturnProperProgress(
+      List<Segment<Text, Text>> segments) throws IOException {
+    Configuration conf = new Configuration();
+    JobConf jobConf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path tmpDir = new Path("localpath");
+    Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
+    Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
+    RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
+    Counter readsCounter = new Counter();
+    Counter writesCounter = new Counter();
+    RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
+        valueClass, segments, 2, tmpDir, comparator, getReporter(),
+        readsCounter, writesCounter);
+    Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
+  }
+
+  private Progressable getReporter() {
+    Progressable reporter = new Progressable() {
+      @Override
+      public void progress() {
+      }
+    };
+    return reporter;
+  }
+
+  private List<Segment<Text, Text>> getUncompressedSegments() throws IOException
{
+    List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
+    for (int i = 1; i < 10; i++) {
+      segments.add(getUncompressedSegment(i));
+    }
+    return segments;
+  }
+
+  private List<Segment<Text, Text>> getCompressedSegments() throws IOException
{
+    List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
+    for (int i = 1; i < 10; i++) {
+      segments.add(getCompressedSegment(i));
+    }
+    return segments;
+  }
+  
+  private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
+    return new Segment<Text, Text>(getReader(i), false);
+  }
+  
+  private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
+    return new Segment<Text, Text>(getReader(i), false, 3000l);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Reader<Text, Text> getReader(int i) throws IOException {
+    Reader<Text, Text> readerMock = mock(Reader.class);
+    when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
+        20l);
+    when(
+        readerMock.next(any(DataInputBuffer.class), any(DataInputBuffer.class)))
+        .thenAnswer(getAnswer("Segment" + i));
+    return readerMock;
+  }
+
+  private Answer<?> getAnswer(final String segmentName) {
+    return new Answer<Object>() {
+      int i = 0;
+
+      public Boolean answer(InvocationOnMock invocation) {
+        Object[] args = invocation.getArguments();
+        DataInputBuffer key = (DataInputBuffer) args[0];
+        DataInputBuffer value = (DataInputBuffer) args[1];
+        if (i++ == 2) {
+          return false;
+        }
+        key.reset(("Segement Key " + segmentName + i).getBytes(), 20);
+        value.reset(("Segement Value" + segmentName + i).getBytes(), 20);
+        return true;
+      }
+    };
+  }
+}



Mime
View raw message