hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r494905 [1/3] - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/io/compress/lzo/ src/native/ src/native/src/ src/native/src/org/apache/hadoop/io/compress/lzo/ src/native/src/org/apache/hadoop/i...
Date Wed, 10 Jan 2007 17:46:57 GMT
Author: cutting
Date: Wed Jan 10 09:46:54 2007
New Revision: 494905

URL: http://svn.apache.org/viewvc?view=rev&rev=494905
Log:
HADOOP-851.  Add support for the LZO codec.  Contributed by Arun.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoCompressor.c
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoDecompressor.c
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/Makefile.am
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/Makefile.in
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/org_apache_hadoop_io_compress_lzo.h
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/src/native/Makefile.am
    lucene/hadoop/trunk/src/native/Makefile.in
    lucene/hadoop/trunk/src/native/NEWS
    lucene/hadoop/trunk/src/native/config.h.in
    lucene/hadoop/trunk/src/native/configure
    lucene/hadoop/trunk/src/native/configure.ac
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/org_apache_hadoop_io_compress_zlib.h
    lucene/hadoop/trunk/src/native/src/org_apache_hadoop.h
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=494905&r1=494904&r2=494905
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jan 10 09:46:54 2007
@@ -32,6 +32,10 @@
 10. HADOOP-873.	 Pass java.library.path correctly to child processes.
     (omalley via cutting)
 
+11. HADOOP-851.  Add support for the LZO codec.  This is much faster
+    than the default, zlib-based compression, but it is only available
+    when the native library is built.  (Arun C Murthy via cutting)
+
 
 Release 0.10.0 - 2007-01-05
 

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=494905&r1=494904&r2=494905
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Wed Jan 10 09:46:54 2007
@@ -194,6 +194,7 @@
   	
     <mkdir dir="${build.native}/lib"/>
     <mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/zlib"/>
+    <mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/lzo"/>
 
   	<javah
   	  classpath="${build.classes}"
@@ -203,6 +204,16 @@
   	  >
   	  <class name="org.apache.hadoop.io.compress.zlib.ZlibCompressor" />
       <class name="org.apache.hadoop.io.compress.zlib.ZlibDecompressor" />
+  	</javah>
+
+  	<javah
+  	  classpath="${build.classes}"
+  	  destdir="${build.native}/src/org/apache/hadoop/io/compress/lzo"
+      force="yes"
+  	  verbose="yes"
+  	  >
+  	  <class name="org.apache.hadoop.io.compress.lzo.LzoCompressor" />
+      <class name="org.apache.hadoop.io.compress.lzo.LzoDecompressor" />
   	</javah>
 
 	<exec dir="${build.native}" executable="sh" failonerror="true">

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java?view=auto&rev=494905
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java
(added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java
Wed Jan 10 09:46:54 2007
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A {@link org.apache.hadoop.io.compress.CompressorStream} which works
+ * with 'block-based' based compression algorithms, as opposed to 
+ * 'stream-based' compression algorithms.
+ *  
+ * @author Arun C Murthy
+ */
+class BlockCompressorStream extends CompressorStream {
+
+  // The 'maximum' size of input data to be compressed, to account
+  // for the overhead of the compression algorithm.
+  private final int MAX_INPUT_SIZE;
+
+  /**
+   * Create a {@link BlockCompressorStream}.
+   * 
+   * @param out stream
+   * @param compressor compressor to be used
+   * @param bufferSize size of buffer
+   * @param compressionOverhead maximum 'overhead' of the compression 
+   *                            algorithm with given bufferSize
+   */
+  public BlockCompressorStream(OutputStream out, Compressor compressor, 
+      int bufferSize, int compressionOverhead) {
+    super(out, compressor, bufferSize);
+    MAX_INPUT_SIZE = bufferSize - compressionOverhead;
+  }
+
+  /**
+   * Create a {@link BlockCompressorStream} with given output-stream and 
+   * compressor.
+   * Use default of 512 as bufferSize and compressionOverhead of 
+   * (1% of bufferSize + 12 bytes) =  18 bytes (zlib algorithm).
+   * 
+   * @param out stream
+   * @param compressor compressor to be used
+   */
+  public BlockCompressorStream(OutputStream out, Compressor compressor) {
+    this(out, compressor, 512, 18);
+  }
+
+  public void write(byte[] b, int off, int len) throws IOException {
+    // Sanity checks
+    if (compressor.finished()) {
+      throw new IOException("write beyond end of stream");
+    }
+    if (b == null) {
+      throw new NullPointerException();
+    } else if ((off < 0) || (off > b.length) || (len < 0) ||
+            ((off + len) > b.length)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+
+    // Write out the length of the original data
+    rawWriteInt(len);
+    
+    // Compress data
+    if (!compressor.finished()) {
+      do {
+        // Compress atmost 'maxInputSize' chunks at a time
+        int bufLen = Math.min(len, MAX_INPUT_SIZE);
+        
+        compressor.setInput(b, off, bufLen);
+        while (!compressor.needsInput()) {
+          compress();
+        }
+        off += bufLen;
+        len -= bufLen;
+      } while (len > 0);
+    }
+  }
+
+  void compress() throws IOException {
+    int len = compressor.compress(buffer, 0, buffer.length);
+    if (len > 0) {
+      // Write out the compressed chunk
+      rawWriteInt(len);
+      out.write(buffer, 0, len);
+    }
+  }
+  
+  private void rawWriteInt(int v) throws IOException {
+    out.write((v >>> 24) & 0xFF);
+    out.write((v >>> 16) & 0xFF);
+    out.write((v >>>  8) & 0xFF);
+    out.write((v >>>  0) & 0xFF);
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java?view=auto&rev=494905
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java
(added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java
Wed Jan 10 09:46:54 2007
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link org.apache.hadoop.io.compress.DecompressorStream} which works
+ * with 'block-based' based compression algorithms, as opposed to 
+ * 'stream-based' compression algorithms.
+ *  
+ * @author Arun C Murthy
+ */
+class BlockDecompressorStream extends DecompressorStream {
+  private int originalBlockSize = 0;
+  private int noUncompressedBytes = 0;
+
+  /**
+   * Create a {@link BlockDecompressorStream}.
+   * 
+   * @param in input stream
+   * @param decompressor decompressor to use
+   * @param bufferSize size of buffer
+   */
+  public BlockDecompressorStream(InputStream in, Decompressor decompressor, 
+      int bufferSize) {
+    super(in, decompressor, bufferSize);
+  }
+  
+  /**
+   * Create a {@link BlockDecompressorStream}.
+   * 
+   * @param in input stream
+   * @param decompressor decompressor to use
+   */
+  public BlockDecompressorStream(InputStream in, Decompressor decompressor) {
+    super(in, decompressor);
+  }
+
+  protected BlockDecompressorStream(InputStream in) {
+    super(in);
+  }
+
+  int decompress(byte[] b, int off, int len) throws IOException {
+    // Check if we are the beginning of a block
+    if (noUncompressedBytes == originalBlockSize) {
+      // Get original data size
+      try {
+        originalBlockSize =  rawReadInt();
+      } catch (IOException ioe) {
+        return -1;
+      }
+      noUncompressedBytes = 0;
+    }
+    
+    int n = 0;
+    while ((n = decompressor.decompress(b, off, len)) == 0) {
+      if (decompressor.finished() || decompressor.needsDictionary()) {
+        if (noUncompressedBytes >= originalBlockSize) {
+          eof = true;
+          return -1;
+        }
+      }
+      if (decompressor.needsInput()) {
+        getCompressedData();
+      }
+    }
+    
+    // Note the no. of decompressed bytes read from 'current' block
+    noUncompressedBytes += n;
+
+    return n;
+  }
+
+  void getCompressedData() throws IOException {
+    checkStream();
+
+    // Get the size of the compressed chunk
+    int len = rawReadInt();
+
+    // Read len bytes from underlying stream 
+    if (len > buffer.length) {
+      buffer = new byte[len];
+    }
+    int n = 0, off = 0;
+    while (n < len) {
+      int count = in.read(buffer, off + n, len - n);
+      if (count < 0) {
+        throw new EOFException();
+      }
+      n += count;
+    }
+    
+    // Send the read data to the decompressor
+    decompressor.setInput(buffer, 0, len);
+  }
+
+  public void resetState() throws IOException {
+    super.resetState();
+  }
+
+  private int rawReadInt() throws IOException {
+    int b1 = in.read();
+    int b2 = in.read();
+    int b3 = in.read();
+    int b4 = in.read();
+    if ((b1 | b2 | b3 | b4) < 0)
+        throw new EOFException();
+    return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0));
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java?view=auto&rev=494905
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java Wed Jan 10 09:46:54
2007
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.lzo.*;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A {@link org.apache.hadoop.io.compress.CompressionCodec} for a streaming
+ * <b>lzo</b> compression/decompression pair.
+ * http://www.oberhumer.com/opensource/lzo/
+ * 
+ * @author Arun C Murthy
+ */
+public class LzoCodec implements Configurable, CompressionCodec {
+  
+  private static final Log LOG = LogFactory.getLog(LzoCodec.class.getName());
+
+  private Configuration conf;
+  
+  public void setConf(Configuration conf) {
+	  this.conf = conf;
+  }
+  
+  public Configuration getConf() {
+	  return conf;
+  }
+
+  private static boolean nativeLzoLoaded = false;
+  
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      nativeLzoLoaded = LzoCompressor.isNativeLzoLoaded() &&
+                          LzoDecompressor.isNativeLzoLoaded();
+      
+      if (nativeLzoLoaded) {
+        LOG.info("Successfully loaded & initialized native-lzo library");
+      } else {
+        LOG.error("Failed to load/initialize native-lzo library");
+      }
+    } else {
+      LOG.error("Cannot load native-lzo without native-hadoop");
+    }
+  }
+
+  /**
+   * Check if native-lzo library is loaded & initialized.
+   * 
+   * @return <code>true</code> if native-lzo library is loaded & initialized;
+   *         else <code>false</code>
+   */
+  public static boolean isNativeLzoLoaded() {
+    return nativeLzoLoaded;
+  }
+  
+  public CompressionOutputStream createOutputStream(OutputStream out) 
+  throws IOException {
+    // Ensure native-lzo library is loaded & initialized
+    if (!isNativeLzoLoaded()) {
+      throw new IOException("native-lzo library not available");
+    }
+    
+    /**
+     * <b>http://www.oberhumer.com/opensource/lzo/lzofaq.php</b>
+     *
+     * How much can my data expand during compression ?
+     * ================================================
+     * LZO will expand incompressible data by a little amount.
+     * I still haven't computed the exact values, but I suggest using
+     * these formulas for a worst-case expansion calculation:
+     * 
+     * Algorithm LZO1, LZO1A, LZO1B, LZO1C, LZO1F, LZO1X, LZO1Y, LZO1Z:
+     * ----------------------------------------------------------------
+     * output_block_size = input_block_size + (input_block_size / 16) + 64 + 3
+     * 
+     * This is about 106% for a large block size.
+     * 
+     * Algorithm LZO2A:
+     * ----------------
+     * output_block_size = input_block_size + (input_block_size / 8) + 128 + 3
+     */
+
+    // Create the lzo output-stream
+    LzoCompressor.CompressionStrategy strategy = 
+      LzoCompressor.CompressionStrategy.valueOf(
+              conf.get("io.compression.codec.lzo.compressor",
+                        LzoCompressor.CompressionStrategy.LZO1X_1.name()
+                      )
+                    ); 
+    int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", 
+                                  64*1024);
+    int compressionOverhead = 0;
+    if (strategy.name().contains("LZO1")) {
+      compressionOverhead = (int)(((bufferSize - (64 + 3)) * 16.0) / 17.0);  
+    } else {
+      compressionOverhead = (int)(((bufferSize - (128 + 3)) * 8.0) / 9.0);
+    }
+     
+    return new BlockCompressorStream(out, 
+            new LzoCompressor(strategy, bufferSize), 
+            bufferSize, compressionOverhead);
+  }
+  
+  public CompressionInputStream createInputStream(InputStream in) 
+  throws IOException {
+    // Ensure native-lzo library is loaded & initialized
+    if (!isNativeLzoLoaded()) {
+      throw new IOException("native-lzo library not available");
+    }
+    
+    // Create the lzo input-stream
+    LzoDecompressor.CompressionStrategy strategy = 
+      LzoDecompressor.CompressionStrategy.valueOf(
+              conf.get("io.compression.codec.lzo.decompressor",
+                        LzoDecompressor.CompressionStrategy.LZO1X.name()
+                      )
+                    ); 
+    int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", 
+                                  64*1024);
+
+    return new BlockDecompressorStream(in, 
+            new LzoDecompressor(strategy, bufferSize), 
+            bufferSize);
+  }
+  
+  /**
+   * Get the default filename extension for this kind of compression.
+   * @return the extension including the '.'
+   */
+  public String getDefaultExtension() {
+    return ".lzo";
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java?view=auto&rev=494905
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java Wed
Jan 10 09:46:54 2007
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.lzo;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+/**
+ * A {@link Compressor} based on the lzo algorithm.
+ * http://www.oberhumer.com/opensource/lzo/
+ * 
+ * @author Arun C Murthy
+ */
+public class LzoCompressor implements Compressor {
+  private static final Log LOG = 
+    LogFactory.getLog(LzoCompressor.class.getName());
+
+  private int directBufferSize;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private Buffer uncompressedDirectBuf = null;
+  private int uncompressedDirectBufLen = 0;
+  private Buffer compressedDirectBuf = null;
+  private boolean finish, finished;
+  
+  private CompressionStrategy strategy; // The lzo compression algorithm.
+  private long lzoCompressor = 0;       // The actual lzo compression function.
+  private int workingMemoryBufLen = 0;  // The length of 'working memory' buf.
+  private Buffer workingMemoryBuf;      // The 'working memory' for lzo.
+  
+  /**
+   * The compression algorithm for lzo library.
+   */
+  public static enum CompressionStrategy {
+    /**
+     * lzo1 algorithms.
+     */
+    LZO1 (0),
+    LZO1_99 (1),
+    
+    /**
+     * lzo1a algorithms.
+     */
+    LZO1A (2),
+    LZO1A_99 (3),
+    
+    /**
+     * lzo1b algorithms.
+     */
+    LZO1B (4),
+    LZO1B_BEST_COMPRESSION(5),
+    LZO1B_BEST_SPEED(6),
+    LZO1B_1 (7),
+    LZO1B_2 (8),
+    LZO1B_3 (9),
+    LZO1B_4 (10),
+    LZO1B_5 (11),
+    LZO1B_6 (12),
+    LZO1B_7 (13),
+    LZO1B_8 (14),
+    LZO1B_9 (15),
+    LZO1B_99 (16),
+    LZO1B_999 (17),
+
+    /**
+     * lzo1c algorithms.
+     */
+    LZO1C (18),
+    LZO1C_BEST_COMPRESSION(19),
+    LZO1C_BEST_SPEED(20),
+    LZO1C_1 (21),
+    LZO1C_2 (22),
+    LZO1C_3 (23),
+    LZO1C_4 (24),
+    LZO1C_5 (25),
+    LZO1C_6 (26),
+    LZO1C_7 (27),
+    LZO1C_8 (28),
+    LZO1C_9 (29),
+    LZO1C_99 (30),
+    LZO1C_999 (31),
+    
+    /**
+     * lzo1f algorithms.
+     */
+    LZO1F_1 (32),
+    LZO1F_999 (33),
+    
+    /**
+     * lzo1x algorithms.
+     */
+    LZO1X_1 (34),
+    LZO1X_11 (35),
+    LZO1X_12 (36),
+    LZO1X_15 (37),
+    LZO1X_999 (38),
+    
+    /**
+     * lzo1y algorithms.
+     */
+    LZO1Y_1 (39),
+    LZO1Y_999 (40),
+    
+    /**
+     * lzo1z algorithms.
+     */
+    LZO1Z_999 (41),
+    
+    /**
+     * lzo2a algorithms.
+     */
+    LZO2A_999 (42);
+    
+    private final int compressor;
+
+    private CompressionStrategy(int compressor) {
+      this.compressor = compressor;
+    }
+    
+    int getCompressor() {
+      return compressor;
+    }
+  }; // CompressionStrategy
+
+  private static boolean nativeLzoLoaded = false;
+  
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      // Initialize the native library
+      initIDs();
+      nativeLzoLoaded = true;
+    } else {
+      LOG.error("Cannot load " + LzoCompressor.class.getName() + 
+              " without native-hadoop library!");
+    }
+  }
+  
+  /**
+   * Check if lzo compressors are loaded and initialized.
+   * 
+   * @return <code>true</code> if lzo compressors are loaded & initialized,
+   *         else <code>false</code> 
+   */
+  public static boolean isNativeLzoLoaded() {
+    return nativeLzoLoaded;
+  }
+
+  /** 
+   * Creates a new compressor using the specified {@link CompressionStrategy}.
+   * 
+   * @param strategy lzo compression algorithm to use
+   * @param directBufferSize size of the direct buffer to be used.
+   */
+  public LzoCompressor(CompressionStrategy strategy, int directBufferSize) {
+    this.strategy = strategy;
+    this.directBufferSize = directBufferSize;
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+    
+    /**
+     * Initialize {@link #lzoCompress} and {@link #workingMemoryBufLen}
+     */
+    init(this.strategy.getCompressor());
+    workingMemoryBuf = ByteBuffer.allocateDirect(workingMemoryBufLen);
+  }
+  
+  /**
+   * Creates a new compressor with the default lzo1x_1 compression.
+   */
+  public LzoCompressor() {
+    this(CompressionStrategy.LZO1X_1, 64*1024);
+  }
+  
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b== null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufLen = len;
+
+    // Reinitialize lzo's output direct-buffer 
+    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+  }
+
+  synchronized void setInputFromSavedData() {
+    uncompressedDirectBufLen = userBufLen;
+    if (uncompressedDirectBufLen > directBufferSize) {
+      uncompressedDirectBufLen = directBufferSize;
+    }
+
+    // Reinitialize lzo's input direct buffer
+    uncompressedDirectBuf.rewind();
+    ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff,  
+                                          uncompressedDirectBufLen);
+
+    // Note how much data is being fed to lzo
+    userBufOff += uncompressedDirectBufLen;
+    userBufLen -= uncompressedDirectBufLen;
+  }
+
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    // nop
+  }
+
+  public boolean needsInput() {
+    // Consume remaining compressed data?
+    if (compressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+
+    // Check if lzo has consumed all input
+    if (uncompressedDirectBufLen <= 0) {
+      // Check if we have consumed all user-input
+      if (userBufLen <= 0) {
+        return true;
+      } else {
+        setInputFromSavedData();
+      }
+    }
+    
+    return false;
+  }
+  
+  public synchronized void finish() {
+    finish = true;
+  }
+  
+  public synchronized boolean finished() {
+    // Check if 'lzo' says its 'finished' and
+    // all compressed data has been consumed
+    return (finished && compressedDirectBuf.remaining() == 0); 
+  }
+
+  public synchronized int compress(byte[] b, int off, int len) 
+  throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    
+    int n = 0;
+    
+    // Check if there is compressed data
+    n = compressedDirectBuf.remaining();
+    if (n > 0) {
+      n = Math.min(n, len);
+      ((ByteBuffer)compressedDirectBuf).get(b, off, n);
+      return n;
+    }
+
+    // Re-initialize the lzo's output direct-buffer
+    compressedDirectBuf.rewind();
+    compressedDirectBuf.limit(directBufferSize);
+
+    // Compress data
+    n = compressBytesDirect(strategy.getCompressor());
+    compressedDirectBuf.limit(n);
+    
+    // Set 'finished' if lzo has consumed all user-data
+    if (userBufLen <= 0) {
+      finished = true;
+    }
+    
+    // Get atmost 'len' bytes
+    n = Math.min(n, len);
+    ((ByteBuffer)compressedDirectBuf).get(b, off, n);
+
+    return n;
+  }
+
+  public synchronized void reset() {
+    finish = false;
+    finished = false;
+    uncompressedDirectBuf.rewind();
+    uncompressedDirectBufLen = 0;
+    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+    userBufOff = userBufLen = 0;
+  }
+  
+  public synchronized void end() {
+    // nop
+  }
+  
+  private native static void initIDs();
+  private native void init(int compressor);
+  private native int compressBytesDirect(int compressor);
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java?view=auto&rev=494905
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java Wed
Jan 10 09:46:54 2007
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.lzo;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+/**
+ * A {@link Decompressor} based on the lzo algorithm.
+ * http://www.oberhumer.com/opensource/lzo/
+ * 
+ * @author Arun C Murthy
+ */
+public class LzoDecompressor implements Decompressor {
+  private static final Log LOG = 
+    LogFactory.getLog(LzoDecompressor.class.getName());
+  
+  private int directBufferSize;
+  private Buffer compressedDirectBuf = null;
+  private int compressedDirectBufLen;
+  private Buffer uncompressedDirectBuf = null;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private boolean finished;
+  
+  private CompressionStrategy strategy;
+  private long lzoDecompressor = 0;   // The actual lzo decompression function.
+  
+  public static enum CompressionStrategy {
+    /**
+     * lzo1 algorithms.
+     */
+    LZO1 (0),
+
+    /**
+     * lzo1a algorithms.
+     */
+    LZO1A (1),
+
+    /**
+     * lzo1b algorithms.
+     */
+    LZO1B (2),
+    LZO1B_SAFE(3),
+
+    /**
+     * lzo1c algorithms.
+     */
+    LZO1C (4),
+    LZO1C_SAFE(5),
+    LZO1C_ASM (6),
+    LZO1C_ASM_SAFE (7),
+
+    /**
+     * lzo1f algorithms.
+     */
+    LZO1F (8),
+    LZO1F_SAFE (9),
+    LZO1F_ASM_FAST (10),
+    LZO1F_ASM_FAST_SAFE (11),
+    
+    /**
+     * lzo1x algorithms.
+     */
+    LZO1X (12),
+    LZO1X_SAFE (13),
+    LZO1X_ASM (14),
+    LZO1X_ASM_SAFE (15),
+    LZO1X_ASM_FAST (16),
+    LZO1X_ASM_FAST_SAFE (17),
+    
+    /**
+     * lzo1y algorithms.
+     */
+    LZO1Y (18),
+    LZO1Y_SAFE (19),
+    LZO1Y_ASM (20),
+    LZO1Y_ASM_SAFE (21),
+    LZO1Y_ASM_FAST (22),
+    LZO1Y_ASM_FAST_SAFE (23),
+    
+    /**
+     * lzo1z algorithms.
+     */
+    LZO1Z (24),
+    LZO1Z_SAFE (25),
+    
+    /**
+     * lzo2a algorithms.
+     */
+    LZO2A (26),
+    LZO2A_SAFE (27);
+    
+    private final int decompressor;
+
+    private CompressionStrategy(int decompressor) {
+      this.decompressor = decompressor;
+    }
+    
+    int getDecompressor() {
+      return decompressor;
+    }
+  }; // CompressionStrategy
+  
+  private static boolean nativeLzoLoaded = false;
+  
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      // Initialize the native library
+      initIDs();
+      nativeLzoLoaded = true;
+    } else {
+      LOG.error("Cannot load " + LzoDecompressor.class.getName() + 
+              " without native-hadoop library!");
+    }
+  }
+  
+  /**
+   * Check if lzo decompressors are loaded and initialized.
+   * 
+   * @return <code>true</code> if lzo decompressors are loaded & initialized,
+   *         else <code>false</code> 
+   */
+  public static boolean isNativeLzoLoaded() {
+    return nativeLzoLoaded;
+  }
+
+  /**
+   * Creates a new lzo decompressor.
+   * 
+   * @param strategy lzo decompression algorithm
+   * @param directBufferSize size of the direct-buffer
+   */
+  public LzoDecompressor(CompressionStrategy strategy, int directBufferSize) {
+    this.directBufferSize = directBufferSize;
+    this.strategy = strategy;
+    
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    
+    /**
+     * Initialize {@link #lzoDecompress}
+     */
+    init(this.strategy.getDecompressor());
+  }
+  
+  /**
+   * Creates a new lzo decompressor.
+   */
+  public LzoDecompressor() {
+    this(CompressionStrategy.LZO1X, 64*1024);
+  }
+
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+  
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufLen = len;
+    
+    setInputFromSavedData();
+    
+    // Reinitialize lzo's output direct-buffer 
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+  }
+  
+  synchronized void setInputFromSavedData() {
+    compressedDirectBufLen = userBufLen;
+    if (compressedDirectBufLen > directBufferSize) {
+      compressedDirectBufLen = directBufferSize;
+    }
+
+    // Reinitialize lzo's input direct-buffer
+    compressedDirectBuf.rewind();
+    ((ByteBuffer)compressedDirectBuf).put(userBuf, userBufOff, 
+                                        compressedDirectBufLen);
+    
+    // Note how much data is being fed to lzo
+    userBufOff += compressedDirectBufLen;
+    userBufLen -= compressedDirectBufLen;
+  }
+
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    // nop
+  }
+
+  public synchronized boolean needsInput() {
+    // Consume remanining compressed data?
+    if (uncompressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+    
+    // Check if lzo has consumed all input
+    if (compressedDirectBufLen <= 0) {
+      // Check if we have consumed all user-input
+      if (userBufLen <= 0) {
+        return true;
+      } else {
+        setInputFromSavedData();
+      }
+    }
+    
+    return false;
+  }
+
+  public synchronized boolean needsDictionary() {
+    return false;
+  }
+
+  public synchronized boolean finished() {
+    // Check if 'lzo' says its 'finished' and
+    // all uncompressed data has been consumed
+    return (finished && uncompressedDirectBuf.remaining() == 0);
+  }
+
+  public synchronized int decompress(byte[] b, int off, int len) 
+  throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    
+    int n = 0;
+    
+    // Check if there is uncompressed data
+    n = uncompressedDirectBuf.remaining();
+    if(n > 0) {
+      n = Math.min(n, len);
+      ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
+      return n;
+    }
+    
+    // Check if there is data to decompress
+    if (compressedDirectBufLen <= 0) {
+      return 0;
+    }
+    
+    // Re-initialize the lzo's output direct-buffer
+    uncompressedDirectBuf.rewind();
+    uncompressedDirectBuf.limit(directBufferSize);
+
+    // Decompress data
+    n = decompressBytesDirect(strategy.getDecompressor());
+    uncompressedDirectBuf.limit(n);
+
+    // Set 'finished' if lzo has consumed all user-data
+    if (userBufLen <= 0) {
+      finished = true;
+    }
+    
+    // Return atmost 'len' bytes
+    n = Math.min(n, len);
+    ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
+
+    return n;
+  }
+  
+  public synchronized void reset() {
+    finished = false;
+    compressedDirectBufLen = 0;
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    userBufOff = userBufLen = 0;
+  }
+
+  public synchronized void end() {
+    // nop
+  }
+
+  protected void finalize() {
+    end();
+  }
+  
+  private native static void initIDs();
+  private native void init(int decompressor);
+  private native int decompressBytesDirect(int decompressor);
+}

Modified: lucene/hadoop/trunk/src/native/Makefile.am
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/native/Makefile.am?view=diff&rev=494905&r1=494904&r2=494905
==============================================================================
--- lucene/hadoop/trunk/src/native/Makefile.am (original)
+++ lucene/hadoop/trunk/src/native/Makefile.am Wed Jan 10 09:46:54 2007
@@ -36,7 +36,7 @@
 export PLATFORM = $(shell echo $$OS_NAME | tr [A-Z] [a-z])
 
 # List the sub-directories here
-SUBDIRS = src/org/apache/hadoop/io/compress/zlib lib
+SUBDIRS = src/org/apache/hadoop/io/compress/zlib src/org/apache/hadoop/io/compress/lzo lib
 
 # The following export is needed to build libhadoop.so in the 'lib' directory
 export SUBDIRS

Modified: lucene/hadoop/trunk/src/native/Makefile.in
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/native/Makefile.in?view=diff&rev=494905&r1=494904&r2=494905
==============================================================================
--- lucene/hadoop/trunk/src/native/Makefile.in (original)
+++ lucene/hadoop/trunk/src/native/Makefile.in Wed Jan 10 09:46:54 2007
@@ -207,7 +207,7 @@
 target_alias = @target_alias@
 
 # List the sub-directories here
-SUBDIRS = src/org/apache/hadoop/io/compress/zlib lib
+SUBDIRS = src/org/apache/hadoop/io/compress/zlib src/org/apache/hadoop/io/compress/lzo lib
 all: config.h
 	$(MAKE) $(AM_MAKEFLAGS) all-recursive
 

Modified: lucene/hadoop/trunk/src/native/NEWS
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/native/NEWS?view=diff&rev=494905&r1=494904&r2=494905
==============================================================================
--- lucene/hadoop/trunk/src/native/NEWS (original)
+++ lucene/hadoop/trunk/src/native/NEWS Wed Jan 10 09:46:54 2007
@@ -1,3 +1,5 @@
 2006-10-05 Arun C Murthy <arunc@yahoo-inc.com>
   * Initial version of libhadoop released
 
+2007-01-03 Arun C Murthy <arunc@yahoo-inc.com>
+  * Added support for lzo compression library 

Modified: lucene/hadoop/trunk/src/native/config.h.in
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/native/config.h.in?view=diff&rev=494905&r1=494904&r2=494905
==============================================================================
--- lucene/hadoop/trunk/src/native/config.h.in (original)
+++ lucene/hadoop/trunk/src/native/config.h.in Wed Jan 10 09:46:54 2007
@@ -1,5 +1,8 @@
 /* config.h.in.  Generated from configure.ac by autoheader.  */
 
+/* The 'actual' dynamic-library for '-llzo2' */
+#undef HADOOP_LZO_LIBRARY
+
 /* The 'actual' dynamic-library for '-lz' */
 #undef HADOOP_ZLIB_LIBRARY
 
@@ -18,8 +21,41 @@
 /* Define to 1 if you have the `jvm' library (-ljvm). */
 #undef HAVE_LIBJVM
 
+/* Define to 1 if you have the `lzo2' library (-llzo2). */
+#undef HAVE_LIBLZO2
+
 /* Define to 1 if you have the `z' library (-lz). */
 #undef HAVE_LIBZ
+
+/* Define to 1 if you have the <lzo/lzo1a.h> header file. */
+#undef HAVE_LZO_LZO1A_H
+
+/* Define to 1 if you have the <lzo/lzo1b.h> header file. */
+#undef HAVE_LZO_LZO1B_H
+
+/* Define to 1 if you have the <lzo/lzo1c.h> header file. */
+#undef HAVE_LZO_LZO1C_H
+
+/* Define to 1 if you have the <lzo/lzo1f.h> header file. */
+#undef HAVE_LZO_LZO1F_H
+
+/* Define to 1 if you have the <lzo/lzo1x.h> header file. */
+#undef HAVE_LZO_LZO1X_H
+
+/* Define to 1 if you have the <lzo/lzo1y.h> header file. */
+#undef HAVE_LZO_LZO1Y_H
+
+/* Define to 1 if you have the <lzo/lzo1z.h> header file. */
+#undef HAVE_LZO_LZO1Z_H
+
+/* Define to 1 if you have the <lzo/lzo1.h> header file. */
+#undef HAVE_LZO_LZO1_H
+
+/* Define to 1 if you have the <lzo/lzo2a.h> header file. */
+#undef HAVE_LZO_LZO2A_H
+
+/* Define to 1 if you have the <lzo/lzo_asm.h> header file. */
+#undef HAVE_LZO_LZO_ASM_H
 
 /* Define to 1 if you have the <memory.h> header file. */
 #undef HAVE_MEMORY_H



Mime
View raw message