hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r813660 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/lib/input/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/lib/input/
Date Fri, 11 Sep 2009 03:30:46 GMT
Author: cdouglas
Date: Fri Sep 11 03:30:44 2009
New Revision: 813660

URL: http://svn.apache.org/viewvc?rev=813660&view=rev
Log:
MAPREDUCE-830. Add support for splittable compression to TextInputFormats. Contributed by
Abdul Qadeer

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=813660&r1=813659&r2=813660&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 11 03:30:44 2009
@@ -93,6 +93,9 @@
 
     MAPREDUCE-938. Postgresql support for Sqoop. (Aaron Kimball via tomwhite)	
 
+    MAPREDUCE-830. Add support for splittable compression to TextInputFormats.
+    (Abdul Qadeer via cdouglas)
+
   IMPROVEMENTS
 
     MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java?rev=813660&r1=813659&r2=813660&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
Fri Sep 11 03:30:44 2009
@@ -23,7 +23,9 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 
 /**
  * An {@link InputFormat} for plain text files. Files are broken into lines.
@@ -46,7 +48,11 @@
   }
   
   protected boolean isSplitable(FileSystem fs, Path file) {
-    return compressionCodecs.getCodec(file) == null;
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
   
   public RecordReader<Text, Text> getRecordReader(InputSplit genericSplit,

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=813660&r1=813659&r2=813660&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Fri Sep
11 03:30:44 2009
@@ -25,12 +25,15 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
@@ -50,6 +53,7 @@
   private long end;
   private LineReader in;
   private FSDataInputStream fileIn;
+  private final Seekable filePosition;
   int maxLineLength;
   private CompressionCodec codec;
   private Decompressor decompressor;
@@ -82,14 +86,27 @@
     codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
-    FileSystem fs = file.getFileSystem(job);
-    fileIn = fs.open(split.getPath());
+    final FileSystem fs = file.getFileSystem(job);
+    fileIn = fs.open(file);
     if (isCompressedInput()) {
       decompressor = CodecPool.getDecompressor(codec);
-      in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
+      if (codec instanceof SplittableCompressionCodec) {
+        final SplitCompressionInputStream cIn =
+          ((SplittableCompressionCodec)codec).createInputStream(
+            fileIn, decompressor, start, end,
+            SplittableCompressionCodec.READ_MODE.BYBLOCK);
+        in = new LineReader(cIn, job);
+        start = cIn.getAdjustedStart();
+        end = cIn.getAdjustedEnd();
+        filePosition = cIn; // take pos from compressed stream
+      } else {
+        in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
+        filePosition = fileIn;
+      }
     } else {
       fileIn.seek(start);
       in = new LineReader(fileIn, job);
+      filePosition = fileIn;
     }
     // If this is not the first split, we always throw away first record
     // because we always (except the last split) read one extra line in
@@ -107,6 +124,7 @@
     this.start = offset;
     this.pos = offset;
     this.end = endOffset;    
+    filePosition = null;
   }
 
   public LineRecordReader(InputStream in, long offset, long endOffset, 
@@ -118,6 +136,7 @@
     this.start = offset;
     this.pos = offset;
     this.end = endOffset;    
+    filePosition = null;
   }
   
   public LongWritable createKey() {
@@ -136,8 +155,8 @@
   }
   private long getFilePosition() throws IOException {
     long retVal;
-    if (isCompressedInput()) {
-      retVal = fileIn.getPos();
+    if (isCompressedInput() && null != filePosition) {
+      retVal = filePosition.getPos();
     } else {
       retVal = pos;
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java?rev=813660&r1=813659&r2=813660&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java Fri Sep
11 03:30:44 2009
@@ -42,7 +42,11 @@
   }
   
   protected boolean isSplitable(FileSystem fs, Path file) {
-    return compressionCodecs.getCodec(file) == null;
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
 
   public RecordReader<LongWritable, Text> getRecordReader(

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java?rev=813660&r1=813659&r2=813660&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
Fri Sep 11 03:30:44 2009
@@ -24,6 +24,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -40,9 +41,12 @@
 
   @Override
   protected boolean isSplitable(JobContext context, Path file) {
-    CompressionCodec codec = 
+    final CompressionCodec codec =
       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
-    return codec == null;
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
 
   public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=813660&r1=813659&r2=813660&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
Fri Sep 11 03:30:44 2009
@@ -24,10 +24,13 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.mapreduce.Counter;
@@ -38,6 +41,7 @@
 import org.apache.hadoop.util.LineReader;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.Seekable;
 
 /**
  * Treats keys as offset in file and value as line. 
@@ -51,6 +55,7 @@
   private long end;
   private LineReader in;
   private FSDataInputStream fileIn;
+  private Seekable filePosition;
   private int maxLineLength;
   private LongWritable key = null;
   private Text value = null;
@@ -73,14 +78,27 @@
     codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
-    FileSystem fs = file.getFileSystem(job);
-    fileIn = fs.open(split.getPath());
+    final FileSystem fs = file.getFileSystem(job);
+    fileIn = fs.open(file);
     if (isCompressedInput()) {
       decompressor = CodecPool.getDecompressor(codec);
-      in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
+      if (codec instanceof SplittableCompressionCodec) {
+        final SplitCompressionInputStream cIn =
+          ((SplittableCompressionCodec)codec).createInputStream(
+            fileIn, decompressor, start, end,
+            SplittableCompressionCodec.READ_MODE.BYBLOCK);
+        in = new LineReader(cIn, job);
+        start = cIn.getAdjustedStart();
+        end = cIn.getAdjustedEnd();
+        filePosition = cIn;
+      } else {
+        in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
+        filePosition = fileIn;
+      }
     } else {
       fileIn.seek(start);
       in = new LineReader(fileIn, job);
+      filePosition = fileIn;
     }
     // If this is not the first split, we always throw away first record
     // because we always (except the last split) read one extra line in
@@ -91,22 +109,26 @@
     this.pos = start;
   }
   
-  private boolean isCompressedInput() { return (codec != null); }
-  
+  private boolean isCompressedInput() {
+    return (codec != null);
+  }
+
   private int maxBytesToConsume() {
-    return (isCompressedInput()) ? Integer.MAX_VALUE
-                           : (int) Math.min(Integer.MAX_VALUE, (end - start));
+    return isCompressedInput()
+      ? Integer.MAX_VALUE
+      : (int) Math.min(Integer.MAX_VALUE, (end - start));
   }
   
   private long getFilePosition() throws IOException {
     long retVal;
-    if (isCompressedInput()) {
-      retVal = fileIn.getPos();
+    if (isCompressedInput() && null != filePosition) {
+      retVal = filePosition.getPos();
     } else {
       retVal = pos;
     }
     return retVal;
   }
+
   public boolean nextKeyValue() throws IOException {
     if (key == null) {
       key = new LongWritable();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java?rev=813660&r1=813659&r2=813660&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
Fri Sep 11 03:30:44 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -43,9 +44,12 @@
 
   @Override
   protected boolean isSplitable(JobContext context, Path file) {
-    CompressionCodec codec = 
+    final CompressionCodec codec =
       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
-    return codec == null;
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
 
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=813660&r1=813659&r2=813660&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java
Fri Sep 11 03:30:44 2009
@@ -18,18 +18,31 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.*;
-import java.util.*;
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.BitSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.*;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class TestTextInputFormat extends TestCase {
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestTextInputFormat {
   private static final Log LOG =
     LogFactory.getLog(TestTextInputFormat.class.getName());
 
@@ -39,17 +52,19 @@
   private static FileSystem localFs = null; 
   static {
     try {
+      defaultConf.set("fs.default.name", "file:///");
       localFs = FileSystem.getLocal(defaultConf);
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
     }
   }
-  private static Path workDir = 
-    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
-             "TestTextInputFormat");
-  
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+             "TestTextInputFormat").makeQualified(localFs);
+
+  @Test
   public void testFormat() throws Exception {
-    JobConf job = new JobConf();
+    JobConf job = new JobConf(defaultConf);
     Path file = new Path(workDir, "test.txt");
 
     // A reporter that does nothing
@@ -127,6 +142,100 @@
     }
   }
 
+  @Test
+  public void testSplitableCodecs() throws IOException {
+    JobConf conf = new JobConf(defaultConf);
+    int seed = new Random().nextInt();
+    // Create the codec
+    CompressionCodec codec = null;
+    try {
+      codec = (CompressionCodec)
+      ReflectionUtils.newInstance(conf.getClassByName("org.apache.hadoop.io.compress.BZip2Codec"),
conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Illegal codec!");
+    }
+    Path file = new Path(workDir, "test"+codec.getDefaultExtension());
+
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
+    LOG.info("seed = "+seed);
+    Random random = new Random(seed);
+    FileSystem localFs = FileSystem.getLocal(conf);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(conf, workDir);
+
+    final int MAX_LENGTH = 500000;
+
+    // for a variety of lengths
+    for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
+        length += random.nextInt(MAX_LENGTH / 4)+1) {
+
+      LOG.info("creating; entries = " + length);
+
+
+      // create a file with length entries
+      Writer writer =
+        new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      TextInputFormat format = new TextInputFormat();
+      format.configure(conf);
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      for (int i = 0; i < 3; i++) {
+        int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
+        LOG.info("splitting: requesting = " + numSplits);
+        InputSplit[] splits = format.getSplits(conf, numSplits);
+        LOG.info("splitting: got =        " + splits.length);
+
+
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.length; j++) {
+          LOG.debug("split["+j+"]= " + splits[j]);
+          RecordReader<LongWritable, Text> reader =
+            format.getRecordReader(splits[j], conf, reporter);
+          try {
+            int counter = 0;
+            while (reader.next(key, value)) {
+              int v = Integer.parseInt(value.toString());
+              LOG.debug("read " + v);
+
+              if (bits.get(v)) {
+                LOG.warn("conflict with " + v +
+                    " in split " + j +
+                    " at position "+reader.getPos());
+              }
+              assertFalse("Key in multiple partitions.", bits.get(v));
+              bits.set(v);
+              counter++;
+            }
+            if (counter > 0) {
+              LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
+            } else {
+              LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
+            }
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+
+  }
+
   private static LineReader makeStream(String str) throws IOException {
     return new LineReader(new ByteArrayInputStream
                                              (str.getBytes("UTF-8")), 
@@ -137,7 +246,8 @@
                                              (str.getBytes("UTF-8")), 
                                            bufsz);
   }
-  
+
+  @Test
   public void testUTF8() throws Exception {
     LineReader in = makeStream("abcd\u20acbdcd\u20ac");
     Text line = new Text();
@@ -156,6 +266,7 @@
    *
    * @throws Exception
    */
+  @Test
   public void testNewLines() throws Exception {
     final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
     final int STRLENBYTES = STR.getBytes().length;
@@ -195,6 +306,7 @@
    *
    * @throws Exception
    */
+  @Test
   public void testMaxLineLength() throws Exception {
     final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
     final int STRLENBYTES = STR.getBytes().length;
@@ -253,8 +365,9 @@
   /**
    * Test using the gzip codec for reading
    */
-  public static void testGzip() throws IOException {
-    JobConf job = new JobConf();
+  @Test
+  public void testGzip() throws IOException {
+    JobConf job = new JobConf(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     ReflectionUtils.setConf(gzip, job);
     localFs.delete(workDir, true);
@@ -286,8 +399,9 @@
   /**
    * Test using the gzip codec and an empty input file
    */
-  public static void testGzipEmpty() throws IOException {
-    JobConf job = new JobConf();
+  @Test
+  public void testGzipEmpty() throws IOException {
+    JobConf job = new JobConf(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     ReflectionUtils.setConf(gzip, job);
     localFs.delete(workDir, true);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java?rev=813660&r1=813659&r2=813660&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java
Fri Sep 11 03:30:44 2009
@@ -18,9 +18,15 @@
 
 package org.apache.hadoop.mapreduce.lib.input;
 
-import java.io.*;
-import java.util.*;
-import junit.framework.TestCase;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.BitSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.*;
 import org.apache.hadoop.conf.Configuration;
@@ -36,16 +42,21 @@
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class TestMRKeyValueTextInputFormat extends TestCase {
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestMRKeyValueTextInputFormat {
   private static final Log LOG =
     LogFactory.getLog(TestMRKeyValueTextInputFormat.class.getName());
 
-  private static int MAX_LENGTH = 10000;
-  
   private static Configuration defaultConf = new Configuration();
   private static FileSystem localFs = null; 
   static {
     try {
+      defaultConf.set("fs.default.name", "file:///");
       localFs = FileSystem.getLocal(defaultConf);
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
@@ -55,8 +66,9 @@
     new Path(new Path(System.getProperty("test.build.data", "."), "data"),
              "TestKeyValueTextInputFormat");
   
+  @Test
   public void testFormat() throws Exception {
-    Job job = new Job(new Configuration());
+    Job job = new Job(new Configuration(defaultConf));
     Path file = new Path(workDir, "test.txt");
 
     int seed = new Random().nextInt();
@@ -66,6 +78,7 @@
     localFs.delete(workDir, true);
     FileInputFormat.setInputPaths(job, workDir);
 
+    final int MAX_LENGTH = 10000;
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
          length += random.nextInt(MAX_LENGTH / 10) + 1) {
@@ -121,7 +134,10 @@
               value = reader.getCurrentValue();
               clazz = value.getClass();
               assertEquals("Value class is Text.", Text.class, clazz);
-              int v = Integer.parseInt(value.toString());
+              final int k = Integer.parseInt(key.toString());
+              final int v = Integer.parseInt(value.toString());
+              assertEquals("Bad key", 0, k % 2);
+              assertEquals("Mismatched key/value", k / 2, v);
               LOG.debug("read " + v);
               assertFalse("Key in multiple partitions.", bits.get(v));
               bits.set(v);
@@ -137,12 +153,113 @@
 
     }
   }
+
+  @Test
+  public void testSplitableCodecs() throws Exception {
+    final Job job = new Job(defaultConf);
+    final Configuration conf = job.getConfiguration();
+
+    // Create the codec
+    CompressionCodec codec = null;
+    try {
+      codec = (CompressionCodec)
+      ReflectionUtils.newInstance(conf.getClassByName("org.apache.hadoop.io.compress.BZip2Codec"),
conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Illegal codec!");
+    }
+    Path file = new Path(workDir, "test"+codec.getDefaultExtension());
+
+    int seed = new Random().nextInt();
+    LOG.info("seed = " + seed);
+    Random random = new Random(seed);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(job, workDir);
+
+    final int MAX_LENGTH = 500000;
+    FileInputFormat.setMaxInputSplitSize(job, MAX_LENGTH / 20);
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 4) + 1) {
+
+      LOG.info("creating; entries = " + length);
+
+      // create a file with length entries
+      Writer writer =
+        new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i * 2));
+          writer.write("\t");
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      KeyValueTextInputFormat format = new KeyValueTextInputFormat();
+      assertTrue("KVTIF claims not splittable", format.isSplitable(job, file));
+      for (int i = 0; i < 3; i++) {
+        int numSplits = random.nextInt(MAX_LENGTH / 2000) + 1;
+        LOG.info("splitting: requesting = " + numSplits);
+        List<InputSplit> splits = format.getSplits(job);
+        LOG.info("splitting: got =        " + splits.size());
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.size(); j++) {
+          LOG.debug("split["+j+"]= " + splits.get(j));
+          TaskAttemptContext context = MapReduceTestUtil.
+            createDummyMapTaskAttemptContext(job.getConfiguration());
+          RecordReader<Text, Text> reader = format.createRecordReader(
+            splits.get(j), context);
+          Class<?> clazz = reader.getClass();
+          MapContext<Text, Text, Text, Text> mcontext =
+            new MapContext<Text, Text, Text, Text>(job.getConfiguration(),
+            context.getTaskAttemptID(), reader, null, null,
+            MapReduceTestUtil.createDummyReporter(), splits.get(j));
+          reader.initialize(splits.get(j), mcontext);
+
+          Text key = null;
+          Text value = null;
+          try {
+            int count = 0;
+            while (reader.nextKeyValue()) {
+              key = reader.getCurrentKey();
+              value = reader.getCurrentValue();
+              final int k = Integer.parseInt(key.toString());
+              final int v = Integer.parseInt(value.toString());
+              assertEquals("Bad key", 0, k % 2);
+              assertEquals("Mismatched key/value", k / 2, v);
+              LOG.debug("read " + k + "," + v);
+              assertFalse(k + "," + v + " in multiple partitions.",bits.get(v));
+              bits.set(v);
+              count++;
+            }
+            if (count > 0) {
+              LOG.info("splits["+j+"]="+splits.get(j)+" count=" + count);
+            } else {
+              LOG.debug("splits["+j+"]="+splits.get(j)+" count=" + count);
+            }
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+  }
+
   private LineReader makeStream(String str) throws IOException {
     return new LineReader(new ByteArrayInputStream
                                            (str.getBytes("UTF-8")), 
                                            defaultConf);
   }
   
+  @Test
   public void testUTF8() throws Exception {
     LineReader in = makeStream("abcd\u20acbdcd\u20ac");
     Text line = new Text();
@@ -154,6 +271,7 @@
     assertEquals("split on fake newline", "abc\u200axyz", line.toString());
   }
 
+  @Test
   public void testNewLines() throws Exception {
     LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
     Text out = new Text();
@@ -208,8 +326,9 @@
   /**
    * Test using the gzip codec for reading
    */
-  public static void testGzip() throws IOException, InterruptedException {
-    Configuration conf = new Configuration();
+  @Test
+  public void testGzip() throws IOException, InterruptedException {
+    Configuration conf = new Configuration(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     ReflectionUtils.setConf(gzip, conf);
     localFs.delete(workDir, true);



Mime
View raw message