hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r1363536 [2/2] - in /hadoop/common/branches/branch-1.1: ./ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/io/compress/ src/core/org/apache/hadoop/io/compress/bzip2/ src/...
Date Thu, 19 Jul 2012 20:47:49 GMT
Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/io/compress/TestCodec.java?rev=1363536&r1=1363535&r2=1363536&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/io/compress/TestCodec.java
(original)
+++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/io/compress/TestCodec.java
Thu Jul 19 20:47:48 2012
@@ -21,6 +21,7 @@ import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -37,11 +38,11 @@ import java.util.Random;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
-import junit.framework.TestCase;
-
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -59,32 +60,39 @@ import org.apache.hadoop.io.compress.zli
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class TestCodec extends TestCase {
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestCodec {
 
   private static final Log LOG= LogFactory.getLog(TestCodec.class);
 
   private Configuration conf = new Configuration();
   private int count = 10000;
   private int seed = new Random().nextInt();
-  
+
+  @Test
   public void testDefaultCodec() throws IOException {
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DefaultCodec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DefaultCodec");
   }
-  
+
+  @Test
   public void testGzipCodec() throws IOException {
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
   }
-  
+
+  @Test
   public void testBZip2Codec() throws IOException {
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
   }
 
-  
+  @Test
   public void testSnappyCodec() throws IOException {
     if (LoadSnappy.isAvailable()) {
       if (LoadSnappy.isLoaded()) {
@@ -97,6 +105,7 @@ public class TestCodec extends TestCase 
     }
   }
 
+  @Test
   public void testGzipCodecWithParam() throws IOException {
     Configuration conf = new Configuration(this.conf);
     ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
@@ -130,10 +139,7 @@ public class TestCodec extends TestCase 
       key.write(data);
       value.write(data);
     }
-    DataInputBuffer originalData = new DataInputBuffer();
-    DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData));
-    originalData.reset(data.getData(), 0, data.getLength());
-    
+
     LOG.info("Generated " + count + " records");
     
     // Compress data
@@ -157,6 +163,9 @@ public class TestCodec extends TestCase 
       new DataInputStream(new BufferedInputStream(inflateFilter));
 
     // Check
+    DataInputBuffer originalData = new DataInputBuffer();
+    originalData.reset(data.getData(), 0, data.getLength());
+    DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData));
  
     for(int i=0; i < count; ++i) {
       RandomDatum k1 = new RandomDatum();
       RandomDatum v1 = new RandomDatum();
@@ -170,9 +179,129 @@ public class TestCodec extends TestCase 
       assertTrue("original and compressed-then-decompressed-output not equal",
                  k1.equals(k2) && v1.equals(v2));
     }
+
+    // De-compress data byte-at-a-time
+    originalData.reset(data.getData(), 0, data.getLength());
+    deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, 
+        compressedDataBuffer.getLength());
+    inflateFilter = 
+        codec.createInputStream(deCompressedDataBuffer);
+
+    // Check
+    originalIn = new DataInputStream(new BufferedInputStream(originalData));
+    int expected;
+    do {
+      expected = originalIn.read();
+      assertEquals("Inflated stream read by byte does not match",
+          expected, inflateFilter.read());
+    } while (expected != -1);
+
     LOG.info("SUCCESS! Completed checking " + count + " records");
   }
 
+  @Test
+  public void testSplitableCodecs() throws Exception {
+    testSplitableCodec(BZip2Codec.class);
+  }
+
+  private void testSplitableCodec(
+      Class<? extends SplittableCompressionCodec> codecClass)
+      throws IOException {
+    final long DEFLBYTES = 2 * 1024 * 1024;
+    final Configuration conf = new Configuration();
+    final Random rand = new Random();
+    final long seed = rand.nextLong();
+    LOG.info("seed: " + seed);
+    rand.setSeed(seed);
+    SplittableCompressionCodec codec =
+      ReflectionUtils.newInstance(codecClass, conf);
+    final FileSystem fs = FileSystem.getLocal(conf);
+    final FileStatus infile =
+      fs.getFileStatus(writeSplitTestFile(fs, rand, codec, DEFLBYTES));
+    if (infile.getLen() > Integer.MAX_VALUE) {
+      fail("Unexpected compression: " + DEFLBYTES + " -> " + infile.getLen());
+    }
+    final int flen = (int) infile.getLen();
+    final Text line = new Text();
+    final Decompressor dcmp = CodecPool.getDecompressor(codec);
+    try {
+      for (int pos = 0; pos < infile.getLen(); pos += rand.nextInt(flen / 8)) {
+        // read from random positions, verifying that there exist two sequential
+        // lines as written in writeSplitTestFile
+        final SplitCompressionInputStream in =
+          codec.createInputStream(fs.open(infile.getPath()), dcmp,
+              pos, flen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
+        if (in.getAdjustedStart() >= flen) {
+          break;
+        }
+        LOG.info("SAMPLE " + in.getAdjustedStart() + "," + in.getAdjustedEnd());
+        final LineReader lreader = new LineReader(in);
+        lreader.readLine(line); // ignore; likely partial
+        if (in.getPos() >= flen) {
+          break;
+        }
+        lreader.readLine(line);
+        final int seq1 = readLeadingInt(line);
+        lreader.readLine(line);
+        if (in.getPos() >= flen) {
+          break;
+        }
+        final int seq2 = readLeadingInt(line);
+        assertEquals("Mismatched lines", seq1 + 1, seq2);
+      }
+    } finally {
+      CodecPool.returnDecompressor(dcmp);
+    }
+    // remove on success
+    fs.delete(infile.getPath().getParent(), true);
+  }
+
+  private static int readLeadingInt(Text txt) throws IOException {
+    DataInputStream in =
+      new DataInputStream(new ByteArrayInputStream(txt.getBytes()));
+    return in.readInt();
+  }
+
+  /** Write infLen bytes (deflated) to file in test dir using codec.
+   * Records are of the form
+   * &lt;i&gt;&lt;b64 rand&gt;&lt;i+i&gt;&lt;b64 rand&gt;
+   */
+  private static Path writeSplitTestFile(FileSystem fs, Random rand,
+      CompressionCodec codec, long infLen) throws IOException {
+    final int REC_SIZE = 1024;
+    final Path wd = new Path(new Path(
+          System.getProperty("test.build.data", "/tmp")).makeQualified(fs),
+        codec.getClass().getSimpleName());
+    final Path file = new Path(wd, "test" + codec.getDefaultExtension());
+    final byte[] b = new byte[REC_SIZE];
+    final Base64 b64 = new Base64(0, null);
+    DataOutputStream fout = null;
+    Compressor cmp = CodecPool.getCompressor(codec);
+    try {
+      fout = new DataOutputStream(codec.createOutputStream(
+            fs.create(file, true), cmp));
+      final DataOutputBuffer dob = new DataOutputBuffer(REC_SIZE * 4 / 3 + 4);
+      int seq = 0;
+      while (infLen > 0) {
+        rand.nextBytes(b);
+        final byte[] b64enc = b64.encode(b); // ensures rand printable, no LF
+        dob.reset();
+        dob.writeInt(seq);
+        System.arraycopy(dob.getData(), 0, b64enc, 0, dob.getLength());
+        fout.write(b64enc);
+        fout.write('\n');
+        ++seq;
+        infLen -= b64enc.length;
+      }
+      LOG.info("Wrote " + seq + " records to " + file);
+    } finally {
+      IOUtils.cleanup(LOG, fout);
+      CodecPool.returnCompressor(cmp);
+    }
+    return file;
+  }
+
+  @Test
   public void testCodecPoolGzipReuse() throws Exception {
     Configuration conf = new Configuration();
     conf.setBoolean("hadoop.native.lib", true);
@@ -257,6 +386,7 @@ public class TestCodec extends TestCase 
                outbytes.length >= b.length);
   }
 
+  @Test
   public void testCodecInitWithCompressionLevel() throws Exception {
     Configuration conf = new Configuration();
     conf.setBoolean("io.native.lib.available", true);
@@ -276,6 +406,7 @@ public class TestCodec extends TestCase 
                          "org.apache.hadoop.io.compress.DefaultCodec");
   }
 
+  @Test
   public void testCodecPoolCompressorReinit() throws Exception {
     Configuration conf = new Configuration();
     conf.setBoolean("hadoop.native.lib", true);
@@ -289,13 +420,15 @@ public class TestCodec extends TestCase 
     DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
     gzipReinitTest(conf, dfc);
   }
-  
+
+  @Test
   public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException,

       InstantiationException, IllegalAccessException {
     sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.DefaultCodec", 100);
     sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000);
   }
-  
+
+  @Test
   public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException, 
       InstantiationException, IllegalAccessException {
     sequenceFileCodecTest(conf, 0, "org.apache.hadoop.io.compress.BZip2Codec", 100);
@@ -383,6 +516,7 @@ public class TestCodec extends TestCase 
 
   }
 
+  @Test
   public void testGzipCompatibility() throws IOException {
     Random r = new Random();
     long seed = r.nextLong();
@@ -450,12 +584,14 @@ public class TestCodec extends TestCase 
     assertTrue(java.util.Arrays.equals(chk, dflchk));
   }
 
+  @Test
   public void testBuiltInGzipConcat() throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean("hadoop.native.lib", false);
     GzipConcatTest(conf, BuiltInGzipDecompressor.class);
   }
 
+  @Test
   public void testNativeGzipConcat() throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean("hadoop.native.lib", true);
@@ -466,10 +602,7 @@ public class TestCodec extends TestCase 
     GzipConcatTest(conf, GzipCodec.GzipZlibDecompressor.class);
     }
 
-  public TestCodec(String name) {
-    super(name);
-  }
-
+  @Test
   public void testCodecPoolAndGzipDecompressor() {
     // BuiltInZlibInflater should not be used as the GzipCodec decompressor.
     // Assert that this is the case.
@@ -520,6 +653,7 @@ public class TestCodec extends TestCase 
     }
   }
 
+  @Test
   public void testGzipCodecRead() throws IOException {
     // Create a gzipped file and try to read it back, using a decompressor
     // from the CodecPool.
@@ -572,6 +706,7 @@ public class TestCodec extends TestCase 
     }
   }
 
+  @Test
   public void testGzipCodecWrite() throws IOException {
     // Create a gzipped file using a compressor from the CodecPool,
     // and try to read it back via the regular GZIPInputStream.

Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=1363536&r1=1363535&r2=1363536&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
(original)
+++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
Thu Jul 19 20:47:48 2012
@@ -20,16 +20,20 @@ package org.apache.hadoop.mapred;
 
 import java.io.*;
 import java.util.*;
-import junit.framework.TestCase;
 
-import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.compress.*;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class TestTextInputFormat extends TestCase {
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestTextInputFormat {
   private static final Log LOG =
     LogFactory.getLog(TestTextInputFormat.class.getName());
 
@@ -39,17 +43,20 @@ public class TestTextInputFormat extends
   private static FileSystem localFs = null; 
   static {
     try {
+      defaultConf.set("fs.default.name", "file:///");
       localFs = FileSystem.getLocal(defaultConf);
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
     }
   }
-  private static Path workDir = 
-    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
-             "TestTextInputFormat");
-  
+
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+             "TestTextInputFormat").makeQualified(localFs);
+
+  @Test
   public void testFormat() throws Exception {
-    JobConf job = new JobConf();
+    JobConf job = new JobConf(defaultConf);
     Path file = new Path(workDir, "test.txt");
 
     // A reporter that does nothing
@@ -127,6 +134,95 @@ public class TestTextInputFormat extends
     }
   }
 
+  @Test
+  public void testSplitableCodecs() throws IOException {
+    JobConf conf = new JobConf(defaultConf);
+    int seed = new Random().nextInt();
+    // Create the codec
+    CompressionCodec codec = null;
+    try {
+      codec = (CompressionCodec)
+      ReflectionUtils.newInstance(conf.getClassByName("org.apache.hadoop.io.compress.BZip2Codec"),
conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Illegal codec!");
+    }
+    Path file = new Path(workDir, "test"+codec.getDefaultExtension());
+
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
+    LOG.info("seed = "+seed);
+    Random random = new Random(seed);
+    FileSystem localFs = FileSystem.getLocal(conf);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(conf, workDir);
+
+    final int MAX_LENGTH = 500000;
+
+    // for a variety of lengths
+    for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
+        length += random.nextInt(MAX_LENGTH / 4)+1) {
+
+      LOG.info("creating; entries = " + length);
+
+      // create a file with length entries
+      Writer writer =
+        new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      TextInputFormat format = new TextInputFormat();
+      format.configure(conf);
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      for (int i = 0; i < 3; i++) {
+        int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
+        LOG.info("splitting: requesting = " + numSplits);
+        InputSplit[] splits = format.getSplits(conf, numSplits);
+        LOG.info("splitting: got =        " + splits.length);
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.length; j++) {
+          LOG.debug("split["+j+"]= " + splits[j]);
+          RecordReader<LongWritable, Text> reader =
+            format.getRecordReader(splits[j], conf, reporter);
+          try {
+            int counter = 0;
+            while (reader.next(key, value)) {
+              int v = Integer.parseInt(value.toString());
+              LOG.debug("read " + v);
+
+              if (bits.get(v)) {
+                LOG.warn("conflict with " + v +
+                    " in split " + j +
+                    " at position "+reader.getPos());
+              }
+              assertFalse("Key in multiple partitions.", bits.get(v));
+              bits.set(v);
+              counter++;
+            }
+            if (counter > 0) {
+              LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
+            } else {
+              LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
+            }
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+    }
+  }
+
   private static LineReader makeStream(String str) throws IOException {
     return new LineReader(new ByteArrayInputStream
                                              (str.getBytes("UTF-8")), 
@@ -138,6 +234,7 @@ public class TestTextInputFormat extends
                                            bufsz);
   }
   
+  @Test
   public void testUTF8() throws Exception {
     LineReader in = makeStream("abcd\u20acbdcd\u20ac");
     Text line = new Text();
@@ -156,6 +253,7 @@ public class TestTextInputFormat extends
    *
    * @throws Exception
    */
+  @Test
   public void testNewLines() throws Exception {
     final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
     final int STRLENBYTES = STR.getBytes().length;
@@ -195,6 +293,7 @@ public class TestTextInputFormat extends
    *
    * @throws Exception
    */
+  @Test
   public void testMaxLineLength() throws Exception {
     final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
     final int STRLENBYTES = STR.getBytes().length;
@@ -253,8 +352,9 @@ public class TestTextInputFormat extends
   /**
    * Test using the gzip codec for reading
    */
-  public static void testGzip() throws IOException {
-    JobConf job = new JobConf();
+  @Test
+  public void testGzip() throws IOException {
+    JobConf job = new JobConf(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     ReflectionUtils.setConf(gzip, job);
     localFs.delete(workDir, true);
@@ -286,8 +386,9 @@ public class TestTextInputFormat extends
   /**
    * Test using the gzip codec and an empty input file
    */
-  public static void testGzipEmpty() throws IOException {
-    JobConf job = new JobConf();
+  @Test
+  public void testGzipEmpty() throws IOException {
+    JobConf job = new JobConf(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     ReflectionUtils.setConf(gzip, job);
     localFs.delete(workDir, true);

Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=1363536&r1=1363535&r2=1363536&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Thu Jul 19 20:47:48 2012
@@ -97,7 +97,7 @@ public class TestMapReduceLocal extends 
       private float last = 0.0f;
       private boolean progressCalled = false;
       @Override
-      public float getProgress() {
+      public float getProgress() throws IOException {
         progressCalled = true;
         final float ret = super.getProgress();
         assertTrue("getProgress decreased", ret >= last);



Mime
View raw message