hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1301165 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/regionserver/wal/
Date Thu, 15 Mar 2012 19:32:04 GMT
Author: stack
Date: Thu Mar 15 19:32:01 2012
New Revision: 1301165

URL: http://svn.apache.org/viewvc?rev=1301165&view=rev
Log:
HBASE-4608 HLog Compression

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1301165&r1=1301164&r2=1301165&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Mar 15 19:32:01 2012
@@ -644,6 +644,10 @@ public final class HConstants {
   /** File permission umask to use when creating hbase data files */
   public static final String DATA_FILE_UMASK_KEY = "hbase.data.umask";
 
+  /** Configuration name of HLog Compression */
+  public static final String ENABLE_WAL_COMPRESSION =
+    "hbase.regionserver.wal.enablecompression";
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java?rev=1301165&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java Thu Mar 15 19:32:01 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Context that holds the various dictionaries for compression in HLog.
+ */
+@InterfaceAudience.Private
+class CompressionContext {
+  final Dictionary regionDict;
+  final Dictionary tableDict;
+  final Dictionary familyDict;
+  final Dictionary qualifierDict;
+  final Dictionary rowDict;
+
+  public CompressionContext(Class<? extends Dictionary> dictType)
+  throws SecurityException, NoSuchMethodException, InstantiationException,
+      IllegalAccessException, InvocationTargetException {
+    Constructor<? extends Dictionary> dictConstructor =
+        dictType.getConstructor();
+    regionDict = dictConstructor.newInstance();
+    tableDict = dictConstructor.newInstance();
+    familyDict = dictConstructor.newInstance();
+    qualifierDict = dictConstructor.newInstance();
+    rowDict = dictConstructor.newInstance();
+  }
+
+  void clear() {
+    regionDict.clear();
+    tableDict.clear();
+    familyDict.clear();
+    qualifierDict.clear();
+    rowDict.clear();
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1301165&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Thu Mar 15 19:32:01 2012
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import com.google.common.base.Preconditions;
+
+/**
+ * A set of static functions for running our custom WAL compression/decompression.
+ * Also contains a command line tool to compress and uncompress HLogs.
+ */
+@InterfaceAudience.Private
+public class Compressor {
+  /**
+   * Command line tool to compress and uncompress WALs.
+   */
+  public static void main(String[] args) throws IOException {
+    if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) {
+      printHelp();
+      System.exit(-1);
+    }
+
+    Path inputPath = new Path(args[0]);
+    Path outputPath = new Path(args[1]);
+
+    transformFile(inputPath, outputPath);
+  }
+
+  private static void printHelp() {
+    System.err.println("usage: Compressor <input> <output>");
+    System.err.println("If <input> HLog is compressed, <output> will be decompressed.");
+    System.err.println("If <input> HLog is uncompressed, <output> will be compressed.");
+    return;
+  }
+
+  private static void transformFile(Path input, Path output)
+      throws IOException {
+    SequenceFileLogReader in = new SequenceFileLogReader();
+    SequenceFileLogWriter out = new SequenceFileLogWriter();
+
+    try {
+      Configuration conf = HBaseConfiguration.create();
+
+      FileSystem inFS = input.getFileSystem(conf);
+      FileSystem outFS = output.getFileSystem(conf);
+
+      in.init(inFS, input, conf);
+      boolean compress = in.reader.isWALCompressionEnabled();
+
+      conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
+      out.init(outFS, output, conf);
+
+      Entry e = null;
+      while ((e = in.next()) != null) out.append(e);
+    } finally {
+      in.close();
+      out.close();
+    }
+  }
+
+  /**
+   * Reads the next compressed entry and returns it as a byte array
+   * 
+   * @param in the DataInput to read from
+   * @param dict the dictionary we use for our read.
+   * 
+   * @param the uncompressed array.
+   */
+  static byte[] readCompressed(DataInput in, Dictionary dict)
+      throws IOException {
+    byte status = in.readByte();
+
+    if (status == Dictionary.NOT_IN_DICTIONARY) {
+      int length = WritableUtils.readVInt(in);
+      // if this isn't in the dictionary, we need to add to the dictionary.
+      byte[] arr = new byte[length];
+      in.readFully(arr);
+      if (dict != null) dict.addEntry(arr, 0, length);
+      return arr;
+    } else {
+      // Status here is the higher-order byte of index of the dictionary entry
+      // (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are
+      // shorts).
+      short dictIdx = toShort(status, in.readByte());
+      byte[] entry = dict.getEntry(dictIdx);
+      if (entry == null) {
+        throw new IOException("Missing dictionary entry for index "
+            + dictIdx);
+      }
+      return entry;
+    }
+  }
+
+  /**
+   * Reads a compressed entry into an array.
+   * The output into the array ends up length-prefixed.
+   * 
+   * @param to the array to write into
+   * @param offset array offset to start writing to
+   * @param in the DataInput to read from
+   * @param dict the dictionary to use for compression
+   * 
+   * @return the length of the uncompressed data
+   */
+  static int uncompressIntoArray(byte[] to, int offset, DataInput in,
+      Dictionary dict) throws IOException {
+    byte status = in.readByte();
+
+    if (status == Dictionary.NOT_IN_DICTIONARY) {
+      // status byte indicating that data to be read is not in dictionary.
+      // if this isn't in the dictionary, we need to add to the dictionary.
+      int length = WritableUtils.readVInt(in);
+      in.readFully(to, offset, length);
+      dict.addEntry(to, offset, length);
+      return length;
+    } else {
+      // the status byte also acts as the higher order byte of the dictionary
+      // entry
+      short dictIdx = toShort(status, in.readByte());
+      byte[] entry = dict.getEntry(dictIdx);
+      if (entry == null) {
+        throw new IOException("Missing dictionary entry for index "
+            + dictIdx);
+      }
+      // now we write the uncompressed value.
+      Bytes.putBytes(to, offset, entry, 0, entry.length);
+      return entry.length;
+    }
+  }
+
+  /**
+   * Compresses and writes an array to a DataOutput
+   * 
+   * @param data the array to write.
+   * @param out the DataOutput to write into
+   * @param dict the dictionary to use for compression
+   */
+  static void writeCompressed(byte[] data, int offset, int length,
+      DataOutput out, Dictionary dict)
+      throws IOException {
+    short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+    if (dict != null) {
+      dictIdx = dict.findEntry(data, offset, length);
+    }
+    if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+      // not in dict
+      out.writeByte(Dictionary.NOT_IN_DICTIONARY);
+      WritableUtils.writeVInt(out, length);
+      out.write(data, offset, length);
+    } else {
+      out.writeShort(dictIdx);
+    }
+  }
+
+  static short toShort(byte hi, byte lo) {
+    short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
+    Preconditions.checkArgument(s >= 0);
+    return s;
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java?rev=1301165&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java Thu Mar 15 19:32:01 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Dictionary interface
+ *
+ * Dictionary indexes should be either bytes or shorts, only positive. (The
+ * first bit is reserved for detecting whether something is compressed or not).
+ */
+@InterfaceAudience.Private
+interface Dictionary {
+  static final byte NOT_IN_DICTIONARY = -1;
+
+  /**
+   * Gets an entry from the dictionary.
+   * 
+   * @param idx index of the entry
+   * @return the entry, or null if non existent
+   */
+  public byte[] getEntry(short idx);
+
+  /**
+   * Finds the index of an entry.
+   * If no entry found, we add it.
+   * 
+   * @param data the byte array that we're looking up
+   * @param offset Offset into <code>data</code> to add to Dictionary.
+   * @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
+   * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
+   */
+  public short findEntry(byte[] data, int offset, int length);
+
+  /**
+   * Adds an entry to the dictionary.
+   * Be careful using this method.  It will add an entry to the
+   * dictionary even if it already has an entry for the same data.
+   * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating
+   * dictionary entries.
+   * 
+   * @param data the entry to add
+   * @param offset Offset into <code>data</code> to add to Dictionary.
+   * @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
+   * @return the index of the entry
+   */
+
+  public short addEntry(byte[] data, int offset, int length);
+
+  /**
+   * Flushes the dictionary, empties all values.
+   */
+  public void clear();
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1301165&r1=1301164&r2=1301165&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu Mar 15 19:32:01 2012
@@ -1671,6 +1671,15 @@ public class HLog implements Syncable {
       return key;
     }
 
+    /**
+     * Set compression context for this entry.
+     * @param compressionContext Compression context
+     */
+    public void setCompressionContext(CompressionContext compressionContext) {
+      edit.setCompressionContext(compressionContext);
+      key.setCompressionContext(compressionContext);
+    }
+
     @Override
     public String toString() {
       return this.key + "=" + this.edit;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=1301165&r1=1301164&r2=1301165&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Thu Mar 15 19:32:01 2012
@@ -46,7 +46,40 @@ import org.apache.hadoop.io.WritableUtil
 @InterfaceAudience.Private
 public class HLogKey implements WritableComparable<HLogKey> {
   // should be < 0 (@see #readFields(DataInput))
-  private static final int VERSION = -1;
+  // version 2 supports HLog compression
+  enum Version {
+    UNVERSIONED(0),
+    // Initial number we put on HLogKey when we introduced versioning.
+    INITIAL(-1),
+    // Version -2 introduced a dictionary compression facility.  Only this
+    // dictionary-based compression is available in version -2.
+    COMPRESSED(-2);
+
+    final int code;
+    static final Version[] byCode;
+    static {
+      byCode = Version.values();
+      for (int i = 0; i < byCode.length; i++) {
+        if (byCode[i].code != -1 * i) {
+          throw new AssertionError("Values in this enum should be descending by one");
+        }
+      }
+    }
+
+    Version(int code) {
+      this.code = code;
+    }
+
+    boolean atLeast(Version other) {
+      return code <= other.code;
+    }
+
+    static Version fromCode(int code) {
+      return byCode[code * -1];
+    }
+  }
+
+  private static final Version VERSION = Version.COMPRESSED;
 
   //  The encoded region name.
   private byte [] encodedRegionName;
@@ -57,7 +90,9 @@ public class HLogKey implements Writable
 
   private UUID clusterId;
 
-  /** Writable Consructor -- Do not use. */
+  private CompressionContext compressionContext;
+
+  /** Writable Constructor -- Do not use. */
   public HLogKey() {
     this(null, null, 0L, HConstants.LATEST_TIMESTAMP,
         HConstants.DEFAULT_CLUSTER_ID);
@@ -84,6 +119,13 @@ public class HLogKey implements Writable
     this.clusterId = clusterId;
   }
 
+  /**
+   * @param compressionContext Compression context to use
+   */
+  public void setCompressionContext(CompressionContext compressionContext) {
+    this.compressionContext = compressionContext;
+  }
+
   /** @return encoded region name */
   public byte [] getEncodedRegionName() {
     return encodedRegionName;
@@ -215,9 +257,17 @@ public class HLogKey implements Writable
 
   @Override
   public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVInt(out, VERSION);
-    Bytes.writeByteArray(out, this.encodedRegionName);
-    Bytes.writeByteArray(out, this.tablename);
+    WritableUtils.writeVInt(out, VERSION.code);
+    if (compressionContext == null) {
+      Bytes.writeByteArray(out, this.encodedRegionName);
+      Bytes.writeByteArray(out, this.tablename);
+    } else {
+      Compressor.writeCompressed(this.encodedRegionName, 0,
+          this.encodedRegionName.length, out,
+          compressionContext.regionDict);
+      Compressor.writeCompressed(this.tablename, 0, this.tablename.length, out,
+          compressionContext.tableDict);
+    }
     out.writeLong(this.logSeqNum);
     out.writeLong(this.writeTime);
     // avoid storing 16 bytes when replication is not enabled
@@ -232,7 +282,7 @@ public class HLogKey implements Writable
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    int version = 0;
+    Version version = Version.UNVERSIONED;
     // HLogKey was not versioned in the beginning.
     // In order to introduce it now, we make use of the fact
     // that encodedRegionName was written with Bytes.writeByteArray,
@@ -244,16 +294,26 @@ public class HLogKey implements Writable
     int len = WritableUtils.readVInt(in);
     if (len < 0) {
       // what we just read was the version
-      version = len;
-      len = WritableUtils.readVInt(in);
+      version = Version.fromCode(len);
+      // We only compress V2 of HLogkey.
+      // If compression is on, the length is handled by the dictionary
+      if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) {
+        len = WritableUtils.readVInt(in);
+      }
+    }
+    if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) {
+      this.encodedRegionName = new byte[len];
+      in.readFully(this.encodedRegionName);
+      this.tablename = Bytes.readByteArray(in);
+    } else {
+      this.encodedRegionName = Compressor.readCompressed(in, compressionContext.regionDict);
+      this.tablename = Compressor.readCompressed(in, compressionContext.tableDict);
     }
-    this.encodedRegionName = new byte[len];
-    in.readFully(this.encodedRegionName);
-    this.tablename = Bytes.readByteArray(in);
+    
     this.logSeqNum = in.readLong();
     this.writeTime = in.readLong();
     this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
-    if (version < 0) {
+    if (version.atLeast(Version.INITIAL)) {
       if (in.readBoolean()) {
         this.clusterId = new UUID(in.readLong(), in.readLong());
       }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java?rev=1301165&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java Thu Mar 15 19:32:01 2012
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Compression class for {@link KeyValue}s written to the WAL. This is not
+ * synchronized, so synchronization should be handled outside.
+ * 
+ * Class only compresses and uncompresses row keys, family names, and the
+ * qualifier. More may be added depending on use patterns.
+ */
+class KeyValueCompression {
+  /**
+   * Uncompresses a KeyValue from a DataInput and returns it.
+   * 
+   * @param in the DataInput
+   * @param readContext the compressionContext to use.
+   * @return an uncompressed KeyValue
+   * @throws IOException
+   */
+
+  public static KeyValue readKV(DataInput in, CompressionContext readContext)
+      throws IOException {
+    int keylength = WritableUtils.readVInt(in);
+    int vlength = WritableUtils.readVInt(in);
+    int length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
+
+    byte[] backingArray = new byte[length];
+    int pos = 0;
+    pos = Bytes.putInt(backingArray, pos, keylength);
+    pos = Bytes.putInt(backingArray, pos, vlength);
+
+    // the row
+    int elemLen = Compressor.uncompressIntoArray(backingArray,
+        pos + Bytes.SIZEOF_SHORT, in, readContext.rowDict);
+    checkLength(elemLen, Short.MAX_VALUE);
+    pos = Bytes.putShort(backingArray, pos, (short)elemLen);
+    pos += elemLen;
+
+    // family
+    elemLen = Compressor.uncompressIntoArray(backingArray,
+        pos + Bytes.SIZEOF_BYTE, in, readContext.familyDict);
+    checkLength(elemLen, Byte.MAX_VALUE);
+    pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
+    pos += elemLen;
+
+    // qualifier
+    elemLen = Compressor.uncompressIntoArray(backingArray, pos, in,
+        readContext.qualifierDict);
+    pos += elemLen;
+
+    // the rest
+    in.readFully(backingArray, pos, length - pos);
+
+    return new KeyValue(backingArray);
+  }
+
+  private static void checkLength(int len, int max) throws IOException {
+    if (len < 0 || len > max) {
+      throw new IOException(
+          "Invalid length for compresesed portion of keyvalue: " + len);
+    }
+  }
+
+  /**
+   * Compresses and writes ourKV to out, a DataOutput.
+   * 
+   * @param out the DataOutput
+   * @param keyVal the KV to compress and write
+   * @param writeContext the compressionContext to use.
+   * @throws IOException
+   */
+  public static void writeKV(final DataOutput out, KeyValue keyVal,
+      CompressionContext writeContext) throws IOException {
+    byte[] backingArray = keyVal.getBuffer();
+    int offset = keyVal.getOffset();
+
+    // we first write the KeyValue infrastructure as VInts.
+    WritableUtils.writeVInt(out, keyVal.getKeyLength());
+    WritableUtils.writeVInt(out, keyVal.getValueLength());
+
+    // now we write the row key, as the row key is likely to be repeated
+    // We save space only if we attempt to compress elements with duplicates
+    Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getRowOffset(),
+        keyVal.getRowLength(), out, writeContext.rowDict);
+
+  
+    // now family, if it exists. if it doesn't, we write a 0 length array.
+    Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getFamilyOffset(),
+        keyVal.getFamilyLength(), out, writeContext.familyDict);
+
+    // qualifier next
+    Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getQualifierOffset(),
+        keyVal.getQualifierLength(), out,
+        writeContext.qualifierDict);
+
+    // now we write the rest uncompressed
+    int pos = keyVal.getTimestampOffset();
+    int remainingLength = keyVal.getLength() + offset - (pos);
+    out.write(backingArray, pos, remainingLength);
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java?rev=1301165&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java Thu Mar 15 19:32:01 2012
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * WALDictionary using an LRU eviction algorithm. Uses a linked list running
+ * through a hashtable.  Currently has max of 2^15 entries.  Will start
+ * evicting if exceeds this number  The maximum memory we expect this dictionary
+ * to take in the worst case is about:
+ * <code>(2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes (these are some big names) = ~16MB</code>.
+ * If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes.
+ */
+@InterfaceAudience.Private
+public class LRUDictionary implements Dictionary {
+  private final BidirectionalLRUMap backingStore = new BidirectionalLRUMap();
+
+  @Override
+  public byte[] getEntry(short idx) {
+    return backingStore.get(idx);
+  }
+
+  @Override
+  public short findEntry(byte[] data, int offset, int length) {
+    short ret = backingStore.findIdx(data, offset, length);
+    if (ret == NOT_IN_DICTIONARY) {
+      addEntry(data, offset, length);
+    }
+    return ret;
+  }
+
+  @Override
+  public short addEntry(byte[] data, int offset, int length) {
+    if (length <= 0) return NOT_IN_DICTIONARY;
+    return backingStore.put(data, offset, length);
+  }
+
+  @Override
+  public void clear() {
+    backingStore.clear();
+  }
+
+  /*
+   * Internal class used to implement LRU eviction and dual lookup (by key and
+   * value).
+   * 
+   * This is not thread safe. Don't use in multi-threaded applications.
+   */
+  static class BidirectionalLRUMap {
+    static final int MAX_SIZE = Short.MAX_VALUE;
+    private int currSize = 0;
+
+    // Head and tail of the LRU list.
+    private Node head;
+    private Node tail;
+
+    private HashMap<Node, Short> nodeToIndex = new HashMap<Node, Short>();
+    private Node[] indexToNode = new Node[MAX_SIZE];
+
+    public BidirectionalLRUMap() {
+      for (int i = 0; i < MAX_SIZE; i++) {
+        indexToNode[i] = new Node();
+      }
+    }
+
+    private short put(byte[] array, int offset, int length) {
+      // We copy the bytes we want, otherwise we might be holding references to
+      // massive arrays in our dictionary (or those arrays might change)
+      byte[] stored = new byte[length];
+      Bytes.putBytes(stored, 0, array, offset, length);
+
+      if (currSize < MAX_SIZE) {
+        // There is space to add without evicting.
+        indexToNode[currSize].setContents(stored, 0, stored.length);
+        setHead(indexToNode[currSize]);
+        short ret = (short) currSize++;
+        nodeToIndex.put(indexToNode[ret], ret);
+        return ret;
+      } else {
+        short s = nodeToIndex.remove(tail);
+        tail.setContents(stored, 0, stored.length);
+        // we need to rehash this.
+        nodeToIndex.put(tail, s);
+        moveToHead(tail);
+        return s;
+      }
+    }
+
+    private short findIdx(byte[] array, int offset, int length) {
+      Short s;
+      final Node comparisonNode = new Node();
+      comparisonNode.setContents(array, offset, length);
+      if ((s = nodeToIndex.get(comparisonNode)) != null) {
+        moveToHead(indexToNode[s]);
+        return s;
+      } else {
+        return -1;
+      }
+    }
+
+    private byte[] get(short idx) {
+      Preconditions.checkElementIndex(idx, currSize);
+      moveToHead(indexToNode[idx]);
+      return indexToNode[idx].container;
+    }
+
+    private void moveToHead(Node n) {
+      if (head == n) {
+        // no-op -- it's already the head.
+        return;
+      }
+      // At this point we definitely have prev, since it's not the head.
+      assert n.prev != null;
+      // Unlink prev.
+      n.prev.next = n.next;
+
+      // Unlink next
+      if (n.next != null) {
+        n.next.prev = n.prev;
+      } else {
+        assert n == tail;
+        tail = n.prev;
+      }
+      // Node is now removed from the list. Re-add it at the head.
+      setHead(n);
+    }
+    
+    private void setHead(Node n) {
+      // assume it's already unlinked from the list at this point.
+      n.prev = null;
+      n.next = head;
+      if (head != null) {
+        assert head.prev == null;
+        head.prev = n;
+      }
+
+      head = n;
+
+      // First entry
+      if (tail == null) {
+        tail = n;
+      }
+    }
+
+    private void clear() {
+      currSize = 0;
+      nodeToIndex.clear();
+      tail = null;
+      head = null;
+
+      for (Node n : indexToNode) {
+        n.container = null;
+      }
+
+      for (int i = 0; i < MAX_SIZE; i++) {
+        indexToNode[i].next = null;
+        indexToNode[i].prev = null;
+      }
+    }
+
+    private static class Node {
+      byte[] container;
+      int offset;
+      int length;
+      Node next; // link towards the tail
+      Node prev; // link towards the head
+
+      public Node() {
+      }
+
+      private void setContents(byte[] container, int offset, int length) {
+        this.container = container;
+        this.offset = offset;
+        this.length = length;
+      }
+
+      @Override
+      public int hashCode() {
+        return Bytes.hashCode(container, offset, length);
+      }
+
+      @Override
+      public boolean equals(Object other) {
+        if (!(other instanceof Node)) {
+          return false;
+        }
+
+        Node casted = (Node) other;
+        return Bytes.equals(container, offset, length, casted.container,
+            casted.offset, casted.length);
+      }
+    }
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1301165&r1=1301164&r2=1301165&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Thu Mar 15 19:32:01 2012
@@ -22,11 +22,8 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.FilterInputStream;
 import java.io.IOException;
-import java.lang.Class;
-import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
-import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,7 +55,6 @@ public class SequenceFileLogReader imple
     WALReader(final FileSystem fs, final Path p, final Configuration c)
     throws IOException {
       super(fs, p, c);
-
     }
 
     @Override
@@ -70,6 +66,15 @@ public class SequenceFileLogReader imple
     }
 
     /**
+     * Call this method after init() has been executed
+     * 
+     * @return whether WAL compression is enabled
+     */
+    public boolean isWALCompressionEnabled() {
+      return SequenceFileLogWriter.isWALCompressionEnabled(this.getMetadata());
+    }
+
+    /**
      * Override just so can intercept first call to getPos.
      */
     static class WALReaderFSDataInputStream extends FSDataInputStream {
@@ -136,10 +141,15 @@ public class SequenceFileLogReader imple
 
   Configuration conf;
   WALReader reader;
+
   // Needed logging exceptions
   Path path;
   int edit = 0;
   long entryStart = 0;
+  /**
+   * Compression context to use reading.  Can be null if no compression.
+   */
+  private CompressionContext compressionContext = null;
 
   protected Class<? extends HLogKey> keyClass;
 
@@ -159,19 +169,35 @@ public class SequenceFileLogReader imple
     this.keyClass = keyClass;
   }
 
-
   @Override
   public void init(FileSystem fs, Path path, Configuration conf)
       throws IOException {
     this.conf = conf;
     this.path = path;
     reader = new WALReader(fs, path, conf);
+
+    // If compression is enabled, new dictionaries are created here.
+    boolean compression = reader.isWALCompressionEnabled();
+    if (compression) {
+      try {
+        if (compressionContext == null) {
+          compressionContext = new CompressionContext(LRUDictionary.class);
+        } else {
+          compressionContext.clear();
+        }
+      } catch (Exception e) {
+        throw new IOException("Failed to initialize CompressionContext", e);
+      }
+    }
   }
 
   @Override
   public void close() throws IOException {
     try {
-      reader.close();
+      if (reader != null) {
+        this.reader.close();
+        this.reader = null;
+      }
     } catch (IOException ioe) {
       throw addFileInfoToException(ioe);
     }
@@ -205,6 +231,9 @@ public class SequenceFileLogReader imple
     }
     boolean b = false;
     try {
+      if (compressionContext != null) {
+        e.setCompressionContext(compressionContext);
+      }
       b = this.reader.next(e.getKey(), e.getEdit());
     } catch (IOException ioe) {
       throw addFileInfoToException(ioe);
@@ -259,4 +288,4 @@ public class SequenceFileLogReader imple
 
     return ioe;
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1301165&r1=1301164&r2=1301165&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Thu Mar 15 19:32:01 2012
@@ -25,6 +25,7 @@ import java.io.OutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,7 +34,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -45,6 +48,16 @@ import org.apache.hadoop.io.compress.Def
  */
 @InterfaceAudience.Private
 public class SequenceFileLogWriter implements HLog.Writer {
+  static final Text WAL_VERSION_KEY = new Text("version");
+  // Let the version be 1.  Let absence of a version meta tag be old, version 0.
+  // Set this version '1' to be the version that introduces compression,
+  // the COMPRESSION_VERSION.
+  private static final int COMPRESSION_VERSION = 1;
+  static final int VERSION = COMPRESSION_VERSION;
+  static final Text WAL_VERSION = new Text("" + VERSION);
+  static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
+  static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
+
   private final Log LOG = LogFactory.getLog(this.getClass());
   // The sequence file we delegate to.
   private SequenceFile.Writer writer;
@@ -54,6 +67,13 @@ public class SequenceFileLogWriter imple
 
   private Class<? extends HLogKey> keyClass;
 
+  /**
+   * Context used by our wal dictionary compressor.  Null if we're not to do
+   * our custom dictionary compression.  This custom WAL compression is distinct
+   * from sequencefile native compression.
+   */
+  private CompressionContext compressionContext;
+
   private Method syncFs = null;
   private Method hflush = null;
 
@@ -74,9 +94,56 @@ public class SequenceFileLogWriter imple
     this.keyClass = keyClass;
   }
 
+  /**
+   * Create sequence file Metadata for our WAL file with version and compression
+   * type (if any).
+   * @param conf
+   * @param compress
+   * @return Metadata instance.
+   */
+  private static Metadata createMetadata(final Configuration conf,
+      final boolean compress) {
+    TreeMap<Text, Text> metaMap = new TreeMap<Text, Text>();
+    metaMap.put(WAL_VERSION_KEY, WAL_VERSION);
+    if (compress) {
+      // Currently we only do one compression type.
+      metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE);
+    }
+    return new Metadata(metaMap);
+  }
+
+  /**
+   * Call this method after init() has been executed
+   * 
+   * @return whether WAL compression is enabled
+   */
+  static boolean isWALCompressionEnabled(final Metadata metadata) {
+    // Check version is >= VERSION?
+    Text txt = metadata.get(WAL_VERSION_KEY);
+    if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
+      return false;
+    }
+    // Now check that compression type is present.  Currently only one value.
+    txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
+    return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
+  }
+
   @Override
   public void init(FileSystem fs, Path path, Configuration conf)
   throws IOException {
+    // Should we do our custom WAL compression?
+    boolean compress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
+    if (compress) {
+      try {
+        if (this.compressionContext == null) {
+          this.compressionContext = new CompressionContext(LRUDictionary.class);
+        } else {
+          this.compressionContext.clear();
+        }
+      } catch (Exception e) {
+        throw new IOException("Failed to initiate CompressionContext", e);
+      }
+    }
 
     if (null == keyClass) {
       keyClass = HLog.getKeyClass(conf);
@@ -101,7 +168,7 @@ public class SequenceFileLogWriter imple
                 fs.getDefaultBlockSize())),
             Boolean.valueOf(false) /*createParent*/,
             SequenceFile.CompressionType.NONE, new DefaultCodec(),
-            new Metadata()
+            createMetadata(conf, compress)
             });
     } catch (InvocationTargetException ite) {
       // function was properly called, but threw it's own exception
@@ -123,7 +190,7 @@ public class SequenceFileLogWriter imple
         SequenceFile.CompressionType.NONE,
         new DefaultCodec(),
         null,
-        new Metadata());
+        createMetadata(conf, compress));
     } else {
       LOG.debug("using new createWriter -- HADOOP-6840");
     }
@@ -133,7 +200,8 @@ public class SequenceFileLogWriter imple
     this.hflush = getHFlush();
     String msg = "Path=" + path +
       ", syncFs=" + (this.syncFs != null) +
-      ", hflush=" + (this.hflush != null);
+      ", hflush=" + (this.hflush != null) +
+      ", compression=" + compress;
     if (this.syncFs != null || this.hflush != null) {
       LOG.debug(msg);
     } else {
@@ -207,6 +275,7 @@ public class SequenceFileLogWriter imple
 
   @Override
   public void append(HLog.Entry entry) throws IOException {
+    entry.setCompressionContext(compressionContext);
     this.writer.append(entry.getKey(), entry.getEdit());
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1301165&r1=1301164&r2=1301165&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Thu Mar 15 19:32:01 2012
@@ -76,9 +76,15 @@ public class WALEdit implements Writable
   private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
   private NavigableMap<byte[], Integer> scopes;
 
+  private CompressionContext compressionContext;
+
   public WALEdit() {
   }
 
+  public void setCompressionContext(final CompressionContext compressionContext) {
+    this.compressionContext = compressionContext;
+  }
+
   public void add(KeyValue kv) {
     this.kvs.add(kv);
   }
@@ -116,9 +122,13 @@ public class WALEdit implements Writable
       // this is new style HLog entry containing multiple KeyValues.
       int numEdits = in.readInt();
       for (int idx = 0; idx < numEdits; idx++) {
-        KeyValue kv = new KeyValue();
-        kv.readFields(in);
-        this.add(kv);
+        if (compressionContext != null) {
+          this.add(KeyValueCompression.readKV(in, compressionContext));
+        } else {
+          KeyValue kv = new KeyValue();
+          kv.readFields(in);
+          this.add(kv);
+    	  }
       }
       int numFamilies = in.readInt();
       if (numFamilies > 0) {
@@ -133,7 +143,7 @@ public class WALEdit implements Writable
       }
     } else {
       // this is an old style HLog entry. The int that we just
-      // read is actually the length of a single KeyValue.
+      // read is actually the length of a single KeyValue
       KeyValue kv = new KeyValue();
       kv.readFields(versionOrLength, in);
       this.add(kv);
@@ -146,7 +156,11 @@ public class WALEdit implements Writable
     out.writeInt(kvs.size());
     // We interleave the two lists for code simplicity
     for (KeyValue kv : kvs) {
-      kv.write(out);
+      if (compressionContext != null) {
+        KeyValueCompression.writeKV(out, kv, compressionContext);
+      } else{
+        kv.write(out);
+      }
     }
     if (scopes == null) {
       out.writeInt(0);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1301165&r1=1301164&r2=1301165&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Thu Mar 15 19:32:01 2012
@@ -1461,6 +1461,18 @@ public class Bytes {
   }
 
   /**
+   * @param bytes array to hash
+   * @param offset offset to start from
+   * @param length length to hash
+   * */
+  public static int hashCode(byte[] bytes, int offset, int length) {
+    int hash = 1;
+    for (int i = offset; i < offset + length; i++)
+      hash = (31 * hash) + (int) bytes[i];
+    return hash;
+  }
+
+  /**
    * @param t operands
    * @return Array of byte arrays made from passed array of Text
    */

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java?rev=1301165&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java Thu Mar 15 19:32:01 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test our compressor class.
+ */
+@Category(SmallTests.class)
+public class TestCompressor {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+  }
+
+  @Test
+  public void testToShort() {
+    short s = 1;
+    assertEquals(s, Compressor.toShort((byte)0, (byte)1));
+    s <<= 8;
+    assertEquals(s, Compressor.toShort((byte)1, (byte)0));
+  }
+
+  @Test (expected = IllegalArgumentException.class)
+  public void testNegativeToShort() {
+    Compressor.toShort((byte)0xff, (byte)0xff);
+  }
+
+  @Test
+  public void testCompressingWithNullDictionaries() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    byte [] blahBytes = Bytes.toBytes("blah");
+    Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, null);
+    dos.close();
+    byte [] dosbytes = baos.toByteArray();
+    DataInputStream dis =
+      new DataInputStream(new ByteArrayInputStream(dosbytes));
+    byte [] product = Compressor.readCompressed(dis, null);
+    assertTrue(Bytes.equals(blahBytes, product));
+  }
+
+  @Test
+  public void testCompressingWithClearDictionaries() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    Dictionary dictionary = new LRUDictionary();
+    byte [] blahBytes = Bytes.toBytes("blah");
+    Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, dictionary);
+    dos.close();
+    byte [] dosbytes = baos.toByteArray();
+    DataInputStream dis =
+      new DataInputStream(new ByteArrayInputStream(dosbytes));
+    dictionary = new LRUDictionary();
+    byte [] product = Compressor.readCompressed(dis, dictionary);
+    assertTrue(Bytes.equals(blahBytes, product));
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java?rev=1301165&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java Thu Mar 15 19:32:01 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+import com.google.common.collect.Lists;
+
+@Category(SmallTests.class)
+public class TestKeyValueCompression {
+  private static final byte[] VALUE = Bytes.toBytes("fake value");
+  private static final int BUF_SIZE = 256*1024;
+  
+  @Test
+  public void testCountingKVs() throws Exception {
+    List<KeyValue> kvs = Lists.newArrayList();
+    for (int i = 0; i < 400; i++) {
+      byte[] row = Bytes.toBytes("row" + i);
+      byte[] fam = Bytes.toBytes("fam" + i);
+      byte[] qual = Bytes.toBytes("qual" + i);
+      kvs.add(new KeyValue(row, fam, qual, 12345L, VALUE));
+    }
+    
+    runTestCycle(kvs);
+  }
+  
+  @Test
+  public void testRepeatingKVs() throws Exception {
+    List<KeyValue> kvs = Lists.newArrayList();
+    for (int i = 0; i < 400; i++) {
+      byte[] row = Bytes.toBytes("row" + (i % 10));
+      byte[] fam = Bytes.toBytes("fam" + (i % 127));
+      byte[] qual = Bytes.toBytes("qual" + (i % 128));
+      kvs.add(new KeyValue(row, fam, qual, 12345L, VALUE));
+    }
+    
+    runTestCycle(kvs);
+  }
+
+  private void runTestCycle(List<KeyValue> kvs) throws Exception {
+    CompressionContext ctx = new CompressionContext(LRUDictionary.class);
+    DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
+    for (KeyValue kv : kvs) {
+      KeyValueCompression.writeKV(buf, kv, ctx);
+    }
+
+    ctx.clear();
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(
+        buf.getData(), 0, buf.getLength()));
+    for (KeyValue kv : kvs) {
+      KeyValue readBack = KeyValueCompression.readKV(in, ctx);
+      assertEquals(kv, readBack);
+    }
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java?rev=1301165&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java Thu Mar 15 19:32:01 2012
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.*;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests LRUDictionary
+ */
+@Category(SmallTests.class)
+public class TestLRUDictionary {
+  LRUDictionary testee;
+
+  @Before
+  public void setUp() throws Exception {
+    testee = new LRUDictionary();
+  }
+
+  @Test
+  public void TestContainsNothing() {
+    assertTrue(isDictionaryEmpty(testee));
+  }
+
+  /**
+   * Assert can't add empty array.
+   */
+  @Test
+  public void testPassingEmptyArrayToFindEntry() {
+    assertEquals(Dictionary.NOT_IN_DICTIONARY,
+      testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+    assertEquals(Dictionary.NOT_IN_DICTIONARY,
+      testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+  }
+
+  @Test
+  public void testPassingSameArrayToAddEntry() {
+    // Add random predefined byte array, in this case a random byte array from
+    // HConstants.  Assert that when we add, we get new index.  Thats how it
+    // works.
+    int len = HConstants.CATALOG_FAMILY.length;
+    int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len);
+    assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+    assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+  }
+
+  @Test
+  public void testBasic() {
+    Random rand = new Random();
+    byte[] testBytes = new byte[10];
+    rand.nextBytes(testBytes);
+
+    // Verify that our randomly generated array doesn't exist in the dictionary
+    assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1);
+
+    // now since we looked up an entry, we should have added it to the
+    // dictionary, so it isn't empty
+
+    assertFalse(isDictionaryEmpty(testee));
+
+    // Check if we can find it using findEntry
+    short t = testee.findEntry(testBytes, 0, testBytes.length);
+
+    // Making sure we do find what we're looking for
+    assertTrue(t != -1);
+
+    byte[] testBytesCopy = new byte[20];
+
+    Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length);
+
+    // copy byte arrays, make sure that we check that equal byte arrays are
+    // equal without just checking the reference
+    assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t);
+
+    // make sure the entry retrieved is the same as the one put in
+    assertTrue(Arrays.equals(testBytes, testee.getEntry(t)));
+
+    testee.clear();
+
+    // making sure clear clears the dictionary
+    assertTrue(isDictionaryEmpty(testee));
+  }
+
+  @Test
+  public void TestLRUPolicy(){
+    //start by filling the dictionary up with byte arrays
+    for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) {
+      testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0,
+          (BigInteger.valueOf(i)).toByteArray().length);
+    }
+
+    // check we have the first element added
+    assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+        BigInteger.ZERO.toByteArray().length) != -1);
+
+    // check for an element we know isn't there
+    assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+        BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1);
+
+    // since we just checked for this element, it should be there now.
+    assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+        BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1);
+
+    // test eviction, that the least recently added or looked at element is
+    // evicted.  We looked at ZERO so it should be in the dictionary still.
+    assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+      BigInteger.ZERO.toByteArray().length) != -1);
+    // Now go from beyond 1 to the end.
+    for(int i = 1; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) {
+      assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+          BigInteger.valueOf(i).toByteArray().length) == -1);
+    }
+
+    // check we can find all of these.
+    for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) {
+      assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+          BigInteger.valueOf(i).toByteArray().length) != -1);
+    }
+  }
+
+  static private boolean isDictionaryEmpty(LRUDictionary dict) {
+    try {
+      dict.getEntry((short)0);
+      return false;
+    } catch (IndexOutOfBoundsException ioobe) {
+      return true;
+    }
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1301165&r1=1301164&r2=1301165&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Thu Mar 15 19:32:01 2012
@@ -61,7 +61,7 @@ import org.mockito.Mockito;
 @Category(MediumTests.class)
 public class TestWALReplay {
   public static final Log LOG = LogFactory.getLog(TestWALReplay.class);
-  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
   private Path hbaseRootDir = null;
   private Path oldLogDir;

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java?rev=1301165&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java Thu Mar 15 19:32:01 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MediumTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Enables compression and runs the TestWALReplay tests.
+ */
+@Category(MediumTests.class)
+public class TestWALReplayCompressed extends TestWALReplay {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestWALReplay.setUpBeforeClass();
+    Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+  }
+
+}



Mime
View raw message