hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1137690 - in /hadoop/common/trunk/common: ./ ivy/ src/java/ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/io/compress/snappy/ src/test/core/org/apache/hadoop/io/compress/
Date Mon, 20 Jun 2011 16:32:28 GMT
Author: tomwhite
Date: Mon Jun 20 16:32:27 2011
New Revision: 1137690

URL: http://svn.apache.org/viewvc?rev=1137690&view=rev
Log:
HADOOP-7206. Integrate Snappy compression. Contributed by T Jake Luciani.

Added:
    hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java   (with
props)
    hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/
    hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
  (with props)
    hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
  (with props)
Modified:
    hadoop/common/trunk/common/CHANGES.txt
    hadoop/common/trunk/common/ivy.xml
    hadoop/common/trunk/common/ivy/hadoop-common-template.xml
    hadoop/common/trunk/common/ivy/libraries.properties
    hadoop/common/trunk/common/src/java/core-default.xml
    hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java

Modified: hadoop/common/trunk/common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/CHANGES.txt?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/CHANGES.txt (original)
+++ hadoop/common/trunk/common/CHANGES.txt Mon Jun 20 16:32:27 2011
@@ -47,6 +47,8 @@ Trunk (unreleased changes)
     HADOOP-7379. Add the ability to serialize and deserialize protocol buffers
     in ObjectWritable. (todd)
 
+    HADOOP-7206. Integrate Snappy compression. (T Jake Luciani via tomwhite)
+
   IMPROVEMENTS
 
     HADOOP-7042. Updates to test-patch.sh to include failed test names and

Modified: hadoop/common/trunk/common/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/ivy.xml?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/ivy.xml (original)
+++ hadoop/common/trunk/common/ivy.xml Mon Jun 20 16:32:27 2011
@@ -327,5 +327,9 @@
       name="protobuf-java"
       rev="${protobuf.version}"
       conf="common->default"/>
+    <dependency org="org.xerial.snappy"
+      name="snappy-java"
+      rev="${snappy-java.version}"
+      conf="common->default"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/common/trunk/common/ivy/hadoop-common-template.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/ivy/hadoop-common-template.xml?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/ivy/hadoop-common-template.xml (original)
+++ hadoop/common/trunk/common/ivy/hadoop-common-template.xml Mon Jun 20 16:32:27 2011
@@ -155,5 +155,10 @@
       <artifactId>protobuf-java</artifactId>
       <version>2.4.0a</version>
     </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>java-snappy</artifactId>
+      <version>1.0.3-rc2</version>
+    </dependency>
   </dependencies>
 </project>

Modified: hadoop/common/trunk/common/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/ivy/libraries.properties?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/ivy/libraries.properties (original)
+++ hadoop/common/trunk/common/ivy/libraries.properties Mon Jun 20 16:32:27 2011
@@ -74,6 +74,7 @@ rats-lib.version=0.6
 servlet.version=4.0.6
 servlet-api-2.5.version=6.1.14
 servlet-api.version=2.5
+snappy-java.version=1.0.3-rc2
 slf4j-api.version=1.5.11
 slf4j-log4j12.version=1.5.11
 

Modified: hadoop/common/trunk/common/src/java/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/core-default.xml?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/src/java/core-default.xml (original)
+++ hadoop/common/trunk/common/src/java/core-default.xml Mon Jun 20 16:32:27 2011
@@ -174,7 +174,7 @@
 
 <property>
   <name>io.compression.codecs</name>
-  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec</value>
+  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.DeflateCodec</value>
   <description>A list of the compression codec classes that can be used 
                for compression/decompression.</description>
 </property>

Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java?rev=1137690&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java (added)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java Mon
Jun 20 16:32:27 2011
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyError;
+
+public class SnappyCodec implements Configurable, CompressionCodec {
+  private static final Log logger = LogFactory.getLog(SnappyCodec.class
+      .getName());
+  private static boolean nativeSnappyLoaded = false;
+  private Configuration conf;
+
+  public static final String SNAPPY_BUFFER_SIZE_KEY = "io.compression.codec.snappy.buffersize";
+  public static final int DEFAULT_SNAPPY_BUFFER_SIZE = 256 * 1024;
+
+  public SnappyCodec() {
+
+  }
+
+  public SnappyCodec(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  static {
+    try {
+      if (Snappy.getNativeLibraryVersion() != null) {
+        logger
+            .info("Successfully loaded & initialized native-snappy library [snappy-java
rev "
+                + Snappy.getNativeLibraryVersion() + "]");
+
+        nativeSnappyLoaded = true;
+      } else {
+        logger.info("Failed to load native-snappy library");
+      }
+
+    } catch (SnappyError e) {
+      logger.error("Native Snappy load error: ", e);
+    }
+  }
+
+  public static boolean isNativeSnappyLoaded(Configuration conf) {
+    return nativeSnappyLoaded;
+  }
+
+  public CompressionOutputStream createOutputStream(OutputStream out)
+      throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  public CompressionOutputStream createOutputStream(OutputStream out,
+      Compressor compressor) throws IOException {
+
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+
+    int bufferSize = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+        DEFAULT_SNAPPY_BUFFER_SIZE);
+
+    int compressionOverhead = Snappy.maxCompressedLength(bufferSize) - bufferSize; 
+
+    return new BlockCompressorStream(out, compressor, bufferSize,
+        compressionOverhead);
+  }
+
+  public Class<? extends Compressor> getCompressorType() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+    return SnappyCompressor.class;
+  }
+
+  public Compressor createCompressor() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+
+    return new SnappyCompressor(conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+        DEFAULT_SNAPPY_BUFFER_SIZE));
+  }
+
+  public CompressionInputStream createInputStream(InputStream in)
+      throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  public CompressionInputStream createInputStream(InputStream in,
+      Decompressor decompressor) throws IOException {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+    return new BlockDecompressorStream(in, decompressor, conf.getInt(
+        SNAPPY_BUFFER_SIZE_KEY, DEFAULT_SNAPPY_BUFFER_SIZE));
+  }
+
+  public Class<? extends Decompressor> getDecompressorType() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+    return SnappyDecompressor.class;
+  }
+
+  public Decompressor createDecompressor() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+
+    return new SnappyDecompressor(conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+        DEFAULT_SNAPPY_BUFFER_SIZE));
+  }
+
+  public String getDefaultExtension() {
+    return ".snappy";
+  }
+}

Propchange: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java?rev=1137690&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
(added)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
Mon Jun 20 16:32:27 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyException;
+
+public class SnappyCompressor implements Compressor {
+  private static final Log logger = LogFactory.getLog(SnappyCompressor.class
+      .getName());
+
+  private boolean finish, finished;
+  private ByteBuffer outBuf;
+  private ByteBuffer compressedBuf;
+
+  private long bytesRead = 0L;
+  private long bytesWritten = 0L;
+
+  public SnappyCompressor(int bufferSize) {
+    outBuf = ByteBuffer.allocateDirect(bufferSize);
+    compressedBuf = ByteBuffer.allocateDirect(Snappy
+        .maxCompressedLength(bufferSize));
+
+    reset();
+  }
+
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    finished = false;
+
+    outBuf.put(b, off, len);
+
+    bytesRead += len;
+  }
+
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    // do nothing
+  }
+
+  public synchronized boolean needsInput() {
+    // needs input if compressed data was consumed
+    if (compressedBuf.position() > 0
+        && compressedBuf.limit() > compressedBuf.position())
+      return false;
+
+    return true;
+  }
+
+  public synchronized void finish() {
+    finish = true;
+  }
+
+  public synchronized boolean finished() {
+    // Check if all compressed data has been consumed
+    return (finish && finished);
+  }
+
+  public synchronized int compress(byte[] b, int off, int len)
+      throws IOException {
+
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    if (finished || outBuf.position() == 0) {
+      finished = true;
+      return 0;
+    }
+
+    // Only need todo this once
+    if (compressedBuf.position() == 0) {
+      try {
+        outBuf.limit(outBuf.position());
+        outBuf.rewind();
+
+        int lim = Snappy.compress(outBuf, compressedBuf);
+
+        compressedBuf.limit(lim);
+        compressedBuf.rewind();
+      } catch (SnappyException e) {
+        throw new IOException(e);
+      }
+    }
+
+    int n = (compressedBuf.limit() - compressedBuf.position()) > len ? len
+        : (compressedBuf.limit() - compressedBuf.position());
+
+    if (n == 0) {
+      finished = true;
+      return 0;
+    }
+
+    compressedBuf.get(b, off, n);
+
+    bytesWritten += n;
+
+    // Set 'finished' if snappy has consumed all user-data
+    if (compressedBuf.position() == compressedBuf.limit()) {
+      finished = true;
+
+      outBuf.limit(outBuf.capacity());
+      outBuf.rewind();
+
+      compressedBuf.limit(compressedBuf.capacity());
+      compressedBuf.rewind();
+
+    }
+
+    return n;
+  }
+
+  public synchronized void reset() {
+    finish = false;
+    finished = false;
+
+    outBuf.limit(outBuf.capacity());
+    outBuf.rewind();
+
+    compressedBuf.limit(compressedBuf.capacity());
+    compressedBuf.rewind();
+
+    bytesRead = bytesWritten = 0L;
+  }
+
+  public synchronized void reinit(Configuration conf) {
+    reset();
+  }
+
+  /**
+   * Return number of bytes given to this compressor since last reset.
+   */
+  public synchronized long getBytesRead() {
+    return bytesRead;
+  }
+
+  /**
+   * Return number of bytes consumed by callers of compress since last reset.
+   */
+  public synchronized long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  public synchronized void end() {
+  }
+
+}

Propchange: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java?rev=1137690&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
(added)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
Mon Jun 20 16:32:27 2011
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyException;
+
+public class SnappyDecompressor implements Decompressor {
+
+  private static final Log logger = LogFactory.getLog(SnappyDecompressor.class
+      .getName());
+
+  private boolean finished;
+  private ByteBuffer outBuf;
+  private ByteBuffer uncompressedBuf;
+
+  private long bytesRead = 0L;
+  private long bytesWritten = 0L;
+
+  public SnappyDecompressor(int bufferSize) {
+    outBuf = ByteBuffer.allocateDirect(bufferSize);
+    uncompressedBuf = ByteBuffer.allocateDirect(bufferSize);
+
+    reset();
+  }
+
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    finished = false;
+
+    outBuf.put(b, off, len);
+
+    bytesRead += len;
+  }
+
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    // do nothing
+  }
+
+  public synchronized boolean needsInput() {
+    // needs input if the uncompressed data was consumed
+    if (uncompressedBuf.position() > 0
+        && uncompressedBuf.limit() > uncompressedBuf.position())
+      return false;
+
+    return true;
+  }
+
+  public synchronized boolean needsDictionary() {
+    return false;
+  }
+
+  public synchronized boolean finished() {
+    return finished;
+  }
+
+  public synchronized int decompress(byte[] b, int off, int len)
+      throws IOException {
+
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    // nothing to decompress
+    if ((outBuf.position() == 0 && uncompressedBuf.position() == 0) || finished)
{
+      reset();
+      finished = true;
+
+      return 0;
+    }
+
+    // only needs to do this once per input
+    if (uncompressedBuf.position() == 0) {
+      try {
+        outBuf.limit(outBuf.position());
+        outBuf.rewind();
+
+        int neededLen = Snappy.uncompressedLength(outBuf);
+        outBuf.rewind();
+
+        if (neededLen > uncompressedBuf.capacity())
+          uncompressedBuf = ByteBuffer.allocateDirect(neededLen);
+
+        int lim = Snappy.uncompress(outBuf, uncompressedBuf);
+
+        uncompressedBuf.limit(lim);
+        uncompressedBuf.rewind();
+      } catch (SnappyException e) {
+        throw new IOException(e);
+      }
+    }
+
+    int n = (uncompressedBuf.limit() - uncompressedBuf.position()) > len ? len
+        : (uncompressedBuf.limit() - uncompressedBuf.position());
+
+    if (n == 0) {
+      reset();
+      finished = true;
+      return 0;
+    }
+
+    uncompressedBuf.get(b, off, n);
+
+    bytesWritten += n;
+
+    // Set 'finished' if snappy has consumed all user-data
+    if (uncompressedBuf.position() == uncompressedBuf.limit()) {
+      reset();
+      finished = true;
+    }
+
+    return n;
+  }
+
+  public synchronized int getRemaining() {
+    // Never use this function in BlockDecompressorStream.
+    return 0;
+  }
+
+  public synchronized void reset() {
+    finished = false;
+
+    uncompressedBuf.limit(uncompressedBuf.capacity());
+    uncompressedBuf.rewind();
+
+    outBuf.limit(outBuf.capacity());
+    outBuf.rewind();
+
+    bytesRead = bytesWritten = 0L;
+  }
+
+  public synchronized void end() {
+    // do nothing
+  }
+
+  protected void finalize() {
+    end();
+  }
+
+}

Propchange: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
(original)
+++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
Mon Jun 20 16:32:27 2011
@@ -102,6 +102,12 @@ public class TestCodec {
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DeflateCodec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DeflateCodec");
   }
+  
+  @Test
+  public void testSnappyCodec() throws IOException {
+    codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.SnappyCodec");
+    codecTest(conf, seed, count, "org.apache.hadoop.io.compress.SnappyCodec");
+  }
 
   @Test
   public void testGzipCodecWithParam() throws IOException {



Mime
View raw message