accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject svn commit: r1411745 - in /accumulo/trunk: ./ core/src/main/java/org/apache/accumulo/core/file/rfile/ core/src/test/java/org/apache/accumulo/core/file/rfile/
Date Tue, 20 Nov 2012 17:00:22 GMT
Author: ctubbsii
Date: Tue Nov 20 17:00:21 2012
New Revision: 1411745

URL: http://svn.apache.org/viewvc?rev=1411745&view=rev
Log:
ACCUMULO-790 Added prefix compression for relative key files, per key component

Modified:
    accumulo/trunk/.gitignore
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java

Modified: accumulo/trunk/.gitignore
URL: http://svn.apache.org/viewvc/accumulo/trunk/.gitignore?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/.gitignore (original)
+++ accumulo/trunk/.gitignore Tue Nov 20 17:00:21 2012
@@ -83,6 +83,7 @@
 /core/.classpath
 
 # /examples/
+/examples/lib
 /examples/target
 /examples/.settings
 /examples/.classpath
@@ -125,6 +126,12 @@
 /trace/.project
 /trace/.settings
 
+# /test/
+/test/target
+/test/.classpath
+/test/.project
+/test/.settings
+
 # /test/system/auto/
 /test/system/auto/*.pyc
 /test/system/auto/fake_disk_failure.so

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java Tue Nov 20 17:00:21 2012
@@ -58,13 +58,15 @@ public class BlockIndex {
   public static class BlockIndexEntry implements Comparable<BlockIndexEntry> {
     
     private Key key;
+    private Key prevKey;
     private int entriesLeft;
     private int pos;
     
-    public BlockIndexEntry(int pos, int entriesLeft, Key key) {
+    public BlockIndexEntry(int pos, int entriesLeft, Key key, Key prevKey) {
       this.pos = pos;
       this.entriesLeft = entriesLeft;
       this.key = key;
+      this.prevKey = prevKey;
     }
 
     /**
@@ -87,14 +89,18 @@ public class BlockIndex {
       return key.compareTo(o.key);
     }
     
+    @Override
     public String toString() {
       return key + " " + entriesLeft + " " + pos;
     }
+    
+    public Key getPrevKey() {
+      return prevKey;
+    }
   }
   
   public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
 
-    
     // get a local ref to the index, another thread could change it
     BlockIndexEntry[] blockIndex = this.blockIndex;
     
@@ -150,12 +156,13 @@ public class BlockIndex {
 
     while (count < (indexEntry.getNumEntries() - interval + 1)) {
 
+      Key myPrevKey = rk.getKey();
       int pos = cacheBlock.getPosition();
       rk.readFields(cacheBlock);
       val.readFields(cacheBlock);
 
       if (count > 0 && count % interval == 0) {
-        index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, rk.getKey()));
+        index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, rk.getKey(), myPrevKey));
       }
       
       count++;

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java Tue Nov 20 17:00:21 2012
@@ -252,7 +252,7 @@ public class MultiLevelIndex {
     
     public void readFields(DataInput in, int version) throws IOException {
       
-      if (version == RFile.RINDEX_VER_6) {
+      if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
         level = in.readInt();
         offset = in.readInt();
         hasNext = in.readBoolean();
@@ -723,7 +723,7 @@ public class MultiLevelIndex {
       
       size = 0;
       
-      if (version == RFile.RINDEX_VER_6) {
+      if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
         size = in.readInt();
       }
       

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java Tue Nov 20 17:00:21 2012
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.file.rfi
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
+import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
@@ -78,7 +79,9 @@ public class RFile {
   private RFile() {}
   
   private static final int RINDEX_MAGIC = 0x20637474;
+  static final int RINDEX_VER_7 = 7;
   static final int RINDEX_VER_6 = 6;
+  // static final int RINDEX_VER_5 = 5; // unreleased
   static final int RINDEX_VER_4 = 4;
   static final int RINDEX_VER_3 = 3;
   
@@ -327,6 +330,7 @@ public class RFile {
       previousColumnFamilies = new HashSet<ByteSequence>();
     }
     
+    @Override
     public synchronized void close() throws IOException {
       
       if (closed) {
@@ -338,7 +342,7 @@ public class RFile {
       ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
       
       mba.writeInt(RINDEX_MAGIC);
-      mba.writeInt(RINDEX_VER_6);
+      mba.writeInt(RINDEX_VER_7);
       
       if (currentLocalityGroup != null)
         localityGroups.add(currentLocalityGroup);
@@ -369,6 +373,7 @@ public class RFile {
       }
     }
     
+    @Override
     public void append(Key key, Value value) throws IOException {
       
       if (dataClosed) {
@@ -685,14 +690,12 @@ public class RFile {
           // and speed up others.
 
           MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
-          RelativeKey tmpRk = new RelativeKey();
-          Key pKey = new Key(getTopKey());
-          int fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, pKey, getTopKey());
-          if (fastSkipped > 0) {
-            entriesLeft -= fastSkipped;
+          SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey());
+          if (skippr.skipped > 0) {
+            entriesLeft -= skippr.skipped;
             val = new Value(valbs.toArray());
-            prevKey = pKey;
-            rk = tmpRk;
+            prevKey = skippr.prevKey;
+            rk = skippr.rk;
           }
           
           reseek = false;
@@ -705,8 +708,6 @@ public class RFile {
         }
       }
       
-      int fastSkipped = -1;
-      
       if (reseek) {
         iiter = index.lookup(startKey);
         
@@ -735,7 +736,6 @@ public class RFile {
           if (!checkRange)
             hasTop = true;
 
-          RelativeKey tmpRk = new RelativeKey();
           MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
 
           Key currKey = null;
@@ -747,7 +747,8 @@ public class RFile {
               if (bie != null) {
                 // we are seeked to the current position of the key in the index
                 // need to prime the read process and read this key from the block
-                tmpRk.setPrevKey(bie.getKey());
+                RelativeKey tmpRk = new RelativeKey();
+                tmpRk.setPrevKey(bie.getPrevKey());
                 tmpRk.readFields(currBlock);
                 val = new Value();
 
@@ -756,18 +757,19 @@ public class RFile {
                 
                 // just consumed one key from the input stream, so subtract one from entries left
                 entriesLeft = bie.getEntriesLeft() - 1;
-                prevKey = new Key(bie.getKey());
+                prevKey = new Key(bie.getPrevKey());
                 currKey = bie.getKey();
               }
             }
           }
 
-          fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, prevKey, currKey);
-          entriesLeft -= fastSkipped;
+          SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, currKey);
+          prevKey = skippr.prevKey;
+          entriesLeft -= skippr.skipped;
           val = new Value(valbs.toArray());
           // set rk when everything above is successful, if exception
           // occurs rk will not be set
-          rk = tmpRk;
+          rk = skippr.rk;
         }
       }
       
@@ -842,7 +844,7 @@ public class RFile {
       
       if (magic != RINDEX_MAGIC)
         throw new IOException("Did not see expected magic number, saw " + magic);
-      if (ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
+      if (ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
         throw new IOException("Did not see expected version, saw " + ver);
       
       int size = mb.readInt();

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java Tue Nov 20 17:00:21 2012
@@ -16,94 +16,115 @@
  */
 package org.apache.accumulo.core.file.rfile;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.zip.GZIPOutputStream;
 
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 
-public class RelativeKey implements WritableComparable<RelativeKey> {
+public class RelativeKey implements Writable {
   
-  private Key key;
-  
-  private byte fieldsSame;
+  private static final byte BIT = 0x01;
   
+  private Key key;
   private Key prevKey;
   
-  private static final byte ROW_SAME = 0x01;
-  private static final byte CF_SAME = 0x02;
-  private static final byte CQ_SAME = 0x04;
-  private static final byte CV_SAME = 0x08;
-  private static final byte TS_SAME = 0x10;
-  private static final byte DELETED = 0x20;
-  
-  private static HashMap<Text,Integer> colFams = new HashMap<Text,Integer>();
-  
-  private static long bytesWritten = 0;
+  private byte fieldsSame;
+  private byte fieldsPrefixed;
   
-  public static void printStats() throws Exception {
-    System.out.println("colFams.size() : " + colFams.size());
-    Set<Entry<Text,Integer>> es = colFams.entrySet();
-    
-    int sum = 0;
-    
-    for (Entry<Text,Integer> entry : es) {
-      sum += entry.getKey().getLength();
-    }
-    
-    System.out.println("Total Column name bytes : " + sum);
-    
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(baos));
-    for (Entry<Text,Integer> entry : es) {
-      entry.getKey().write(dos);
-      dos.writeInt(entry.getValue());
-    }
-    
-    dos.close();
-    
-    System.out.println("Compressed column map size : " + baos.toByteArray().length);
-    System.out.printf("Bytes written : %,d%n", bytesWritten);
-    
-  }
+  // Exact match compression options (first byte) and flag for further
+  private static final byte ROW_SAME = BIT << 0;
+  private static final byte CF_SAME = BIT << 1;
+  private static final byte CQ_SAME = BIT << 2;
+  private static final byte CV_SAME = BIT << 3;
+  private static final byte TS_SAME = BIT << 4;
+  private static final byte DELETED = BIT << 5;
+  // private static final byte UNUSED_1_6 = BIT << 6;
+  private static final byte PREFIX_COMPRESSION_ENABLED = (byte) (BIT << 7);
+  
+  // Prefix compression (second byte)
+  private static final byte ROW_COMMON_PREFIX = BIT << 0;
+  private static final byte CF_COMMON_PREFIX = BIT << 1;
+  private static final byte CQ_COMMON_PREFIX = BIT << 2;
+  private static final byte CV_COMMON_PREFIX = BIT << 3;
+  private static final byte TS_DIFF = BIT << 4;
+  
+  // private static final byte UNUSED_2_5 = BIT << 5;
+  // private static final byte UNUSED_2_6 = BIT << 6;
+  // private static final byte UNUSED_2_7 = (byte) (BIT << 7);
+  
+  // Values for prefix compression
+  int rowCommonPrefixLen;
+  int cfCommonPrefixLen;
+  int cqCommonPrefixLen;
+  int cvCommonPrefixLen;
+  long tsDiff;
   
+  /**
+   * This constructor is used when one needs to read from an input stream
+   */
   public RelativeKey() {
     
   }
   
+  /**
+   * This constructor is used when constructing a key for writing to an output stream
+   */
   public RelativeKey(Key prevKey, Key key) {
     
     this.key = key;
     
     fieldsSame = 0;
+    fieldsPrefixed = 0;
+    
+    ByteSequence prevKeyScratch;
+    ByteSequence keyScratch;
     
     if (prevKey != null) {
-      if (prevKey.getRowData().equals(key.getRowData()))
+      
+      prevKeyScratch = prevKey.getRowData();
+      keyScratch = key.getRowData();
+      rowCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
+      if (rowCommonPrefixLen == -1)
         fieldsSame |= ROW_SAME;
+      else if (rowCommonPrefixLen > 1)
+        fieldsPrefixed |= ROW_COMMON_PREFIX;
       
-      if (prevKey.getColumnFamilyData().equals(key.getColumnFamilyData()))
+      prevKeyScratch = prevKey.getColumnFamilyData();
+      keyScratch = key.getColumnFamilyData();
+      cfCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
+      if (cfCommonPrefixLen == -1)
         fieldsSame |= CF_SAME;
+      else if (cfCommonPrefixLen > 1)
+        fieldsPrefixed |= CF_COMMON_PREFIX;
       
-      if (prevKey.getColumnQualifierData().equals(key.getColumnQualifierData()))
+      prevKeyScratch = prevKey.getColumnQualifierData();
+      keyScratch = key.getColumnQualifierData();
+      cqCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
+      if (cqCommonPrefixLen == -1)
         fieldsSame |= CQ_SAME;
+      else if (cqCommonPrefixLen > 1)
+        fieldsPrefixed |= CQ_COMMON_PREFIX;
       
-      if (prevKey.getColumnVisibilityData().equals(key.getColumnVisibilityData()))
+      prevKeyScratch = prevKey.getColumnVisibilityData();
+      keyScratch = key.getColumnVisibilityData();
+      cvCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
+      if (cvCommonPrefixLen == -1)
         fieldsSame |= CV_SAME;
+      else if (cvCommonPrefixLen > 1)
+        fieldsPrefixed |= CV_COMMON_PREFIX;
       
-      if (prevKey.getTimestamp() == key.getTimestamp())
+      tsDiff = key.getTimestamp() - prevKey.getTimestamp();
+      if (tsDiff == 0)
         fieldsSame |= TS_SAME;
+      else
+        fieldsPrefixed |= TS_DIFF;
       
+      fieldsSame |= fieldsPrefixed == 0 ? 0 : PREFIX_COMPRESSION_ENABLED;
     }
     
     // stored deleted information in bit vector instead of its own byte
@@ -111,6 +132,31 @@ public class RelativeKey implements Writ
       fieldsSame |= DELETED;
   }
   
+  /**
+   * 
+   * @return -1 (exact match) or the number of bytes in common
+   */
+  static int getCommonPrefix(ByteSequence prev, ByteSequence cur) {
+    if (prev == cur)
+      return -1; // infinite... exact match
+      
+    int prevLen = prev.length();
+    int curLen = cur.length();
+    int maxChecks = Math.min(prevLen, curLen);
+    int common = 0;
+    while (common < maxChecks) {
+      int a = prev.byteAt(common) & 0xff;
+      int b = cur.byteAt(common) & 0xff;
+      if (a != b)
+        return common;
+      common++;
+    }
+    // no differences found
+    // either exact or matches the part checked, so if they are the same length, they are an exact match,
+    // and if not, then they have a common prefix over all the checks we've done
+    return prevLen == curLen ? -1 : maxChecks;
+  }
+  
   public void setPrevKey(Key pk) {
     this.prevKey = pk;
   }
@@ -118,42 +164,56 @@ public class RelativeKey implements Writ
   @Override
   public void readFields(DataInput in) throws IOException {
     fieldsSame = in.readByte();
+    if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED) {
+      fieldsPrefixed = in.readByte();
+    } else {
+      fieldsPrefixed = 0;
+    }
     
     byte[] row, cf, cq, cv;
     long ts;
     
-    if ((fieldsSame & ROW_SAME) == 0) {
-      row = read(in);
-    } else {
+    if ((fieldsSame & ROW_SAME) == ROW_SAME) {
       row = prevKey.getRowData().toArray();
+    } else if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) {
+      row = readPrefix(in, prevKey.getRowData());
+    } else {
+      row = read(in);
     }
     
-    if ((fieldsSame & CF_SAME) == 0) {
-      cf = read(in);
-    } else {
+    if ((fieldsSame & CF_SAME) == CF_SAME) {
       cf = prevKey.getColumnFamilyData().toArray();
+    } else if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) {
+      cf = readPrefix(in, prevKey.getColumnFamilyData());
+    } else {
+      cf = read(in);
     }
     
-    if ((fieldsSame & CQ_SAME) == 0) {
-      cq = read(in);
-    } else {
+    if ((fieldsSame & CQ_SAME) == CQ_SAME) {
       cq = prevKey.getColumnQualifierData().toArray();
+    } else if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) {
+      cq = readPrefix(in, prevKey.getColumnQualifierData());
+    } else {
+      cq = read(in);
     }
     
-    if ((fieldsSame & CV_SAME) == 0) {
-      cv = read(in);
-    } else {
+    if ((fieldsSame & CV_SAME) == CV_SAME) {
       cv = prevKey.getColumnVisibilityData().toArray();
+    } else if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) {
+      cv = readPrefix(in, prevKey.getColumnVisibilityData());
+    } else {
+      cv = read(in);
     }
     
-    if ((fieldsSame & TS_SAME) == 0) {
-      ts = WritableUtils.readVLong(in);
-    } else {
+    if ((fieldsSame & TS_SAME) == TS_SAME) {
       ts = prevKey.getTimestamp();
+    } else if ((fieldsPrefixed & TS_DIFF) == TS_DIFF) {
+      ts = WritableUtils.readVLong(in) + prevKey.getTimestamp();
+    } else {
+      ts = WritableUtils.readVLong(in);
     }
     
-    this.key = new Key(row, cf, cq, cv, ts, (fieldsSame & DELETED) != 0, false);
-    
+    this.key = new Key(row, cf, cq, cv, ts, (fieldsSame & DELETED) == DELETED, false);
     this.prevKey = this.key;
   }
   
@@ -182,10 +242,22 @@ public class RelativeKey implements Writ
     }
   }
   
-  int fastSkip(DataInput in, Key seekKey, MByteSequence value, Key pkey, Key currKey) throws IOException {
+  public static class SkippR {
+    RelativeKey rk;
+    int skipped;
+    Key prevKey;
+    
+    SkippR(RelativeKey rk, int skipped, Key prevKey) {
+      this.rk = rk;
+      this.skipped = skipped;
+      this.prevKey = prevKey;
+    }
+  }
+  
+  public static SkippR fastSkip(DataInput in, Key seekKey, MByteSequence value, Key prevKey, Key currKey) throws IOException {
     // this method assumes that fast skip is being called on a compressed block where the last key
-    // in the compressed block is >= seekKey... therefore this method should go passed the end of the
-    // compressed block... if it does, there is probably an error in the callers logic
+    // in the compressed block is >= seekKey... therefore this method shouldn't go past the end of the
+    // compressed block... if it does, there is probably an error in the caller's logic
     
     // this method mostly avoids object allocation and only does compares when the row changes
     
@@ -204,11 +276,11 @@ public class RelativeKey implements Writ
     
     if (currKey != null) {
       
-      prow = new MByteSequence(pkey.getRowData());
-      pcf = new MByteSequence(pkey.getColumnFamilyData());
-      pcq = new MByteSequence(pkey.getColumnQualifierData());
-      pcv = new MByteSequence(pkey.getColumnVisibilityData());
-      pts = pkey.getTimestamp();
+      prow = new MByteSequence(currKey.getRowData());
+      pcf = new MByteSequence(currKey.getColumnFamilyData());
+      pcq = new MByteSequence(currKey.getColumnQualifierData());
+      pcv = new MByteSequence(currKey.getColumnVisibilityData());
+      pts = currKey.getTimestamp();
       
       row = new MByteSequence(currKey.getRowData());
       cf = new MByteSequence(currKey.getColumnFamilyData());
@@ -221,15 +293,24 @@ public class RelativeKey implements Writ
       cqCmp = cq.compareTo(stopCQ);
       
       if (rowCmp >= 0) {
-        if (rowCmp > 0)
-          return 0;
+        if (rowCmp > 0) {
+          RelativeKey rk = new RelativeKey();
+          rk.key = rk.prevKey = new Key(currKey);
+          return new SkippR(rk, 0, prevKey);
+        }
         
         if (cfCmp >= 0) {
-          if (cfCmp > 0)
-            return 0;
+          if (cfCmp > 0) {
+            RelativeKey rk = new RelativeKey();
+            rk.key = rk.prevKey = new Key(currKey);
+            return new SkippR(rk, 0, prevKey);
+          }
           
-          if (cqCmp >= 0)
-            return 0;
+          if (cqCmp >= 0) {
+            RelativeKey rk = new RelativeKey();
+            rk.key = rk.prevKey = new Key(currKey);
+            return new SkippR(rk, 0, prevKey);
+          }
         }
       }
       
@@ -246,22 +327,31 @@ public class RelativeKey implements Writ
     }
     
     byte fieldsSame = -1;
+    byte fieldsPrefixed = 0;
     int count = 0;
+    Key newPrevKey = null;
     
     while (true) {
       
-      pdel = (fieldsSame & DELETED) != 0;
+      pdel = (fieldsSame & DELETED) == DELETED;
       
       fieldsSame = in.readByte();
+      if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED)
+        fieldsPrefixed = in.readByte();
+      else
+        fieldsPrefixed = 0;
       
       boolean changed = false;
       
-      if ((fieldsSame & ROW_SAME) == 0) {
+      if ((fieldsSame & ROW_SAME) != ROW_SAME) {
         
         MByteSequence tmp = prow;
         prow = row;
         row = tmp;
         
+        if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX)
+          readPrefix(in, row, prow);
+        else
         read(in, row);
         
         // read a new row, so need to compare...
@@ -269,41 +359,54 @@ public class RelativeKey implements Writ
         changed = true;
       }// else the row is the same as the last, so no need to compare
       
-      if ((fieldsSame & CF_SAME) == 0) {
+      if ((fieldsSame & CF_SAME) != CF_SAME) {
         
         MByteSequence tmp = pcf;
         pcf = cf;
         cf = tmp;
         
+        if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX)
+          readPrefix(in, cf, pcf);
+        else
         read(in, cf);
         
         cfCmp = cf.compareTo(stopCF);
         changed = true;
       }
       
-      if ((fieldsSame & CQ_SAME) == 0) {
+      if ((fieldsSame & CQ_SAME) != CQ_SAME) {
         
         MByteSequence tmp = pcq;
         pcq = cq;
         cq = tmp;
         
+        if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX)
+          readPrefix(in, cq, pcq);
+        else
         read(in, cq);
         
         cqCmp = cq.compareTo(stopCQ);
         changed = true;
       }
       
-      if ((fieldsSame & CV_SAME) == 0) {
+      if ((fieldsSame & CV_SAME) != CV_SAME) {
         
         MByteSequence tmp = pcv;
         pcv = cv;
         cv = tmp;
         
+        if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX)
+          readPrefix(in, cv, pcv);
+        else
         read(in, cv);
       }
       
-      if ((fieldsSame & TS_SAME) == 0) {
+      if ((fieldsSame & TS_SAME) != TS_SAME) {
         pts = ts;
+        
+        if ((fieldsPrefixed & TS_DIFF) == TS_DIFF)
+          ts = WritableUtils.readVLong(in) + pts;
+        else
         ts = WritableUtils.readVLong(in);
       }
       
@@ -332,33 +435,39 @@ public class RelativeKey implements Writ
       
       // when the current keys field is same as the last, then
       // set the prev keys field the same as the current key
-      trow = (fieldsSame & ROW_SAME) == 0 ? prow : row;
-      tcf = (fieldsSame & CF_SAME) == 0 ? pcf : cf;
-      tcq = (fieldsSame & CQ_SAME) == 0 ? pcq : cq;
-      tcv = (fieldsSame & CV_SAME) == 0 ? pcv : cv;
-      tts = (fieldsSame & TS_SAME) == 0 ? pts : ts;
+      trow = (fieldsSame & ROW_SAME) == ROW_SAME ? row : prow;
+      tcf = (fieldsSame & CF_SAME) == CF_SAME ? cf : pcf;
+      tcq = (fieldsSame & CQ_SAME) == CQ_SAME ? cq : pcq;
+      tcv = (fieldsSame & CV_SAME) == CV_SAME ? cv : pcv;
+      tts = (fieldsSame & TS_SAME) == TS_SAME ? ts : pts;
       
-      Key tmp = new Key(trow.getBackingArray(), trow.offset(), trow.length(), tcf.getBackingArray(), tcf.offset(), tcf.length(), tcq.getBackingArray(),
+      newPrevKey = new Key(trow.getBackingArray(), trow.offset(), trow.length(), tcf.getBackingArray(), tcf.offset(), tcf.length(), tcq.getBackingArray(),
           tcq.offset(), tcq.length(), tcv.getBackingArray(), tcv.offset(), tcv.length(), tts);
-      tmp.setDeleted(pdel);
-      pkey.set(tmp);
+      newPrevKey.setDeleted(pdel);
+    } else if (count == 1) {
+      if (currKey != null)
+        newPrevKey = currKey;
+      else
+        newPrevKey = prevKey;
+    } else {
+      throw new IllegalStateException();
     }
     
-    this.key = new Key(row.getBackingArray(), row.offset(), row.length(), cf.getBackingArray(), cf.offset(), cf.length(), cq.getBackingArray(), cq.offset(),
+    RelativeKey result = new RelativeKey();
+    result.key = new Key(row.getBackingArray(), row.offset(), row.length(), cf.getBackingArray(), cf.offset(), cf.length(), cq.getBackingArray(), cq.offset(),
         cq.length(), cv.getBackingArray(), cv.offset(), cv.length(), ts);
-    this.key.setDeleted((fieldsSame & DELETED) != 0);
+    result.key.setDeleted((fieldsSame & DELETED) != 0);
+    result.prevKey = result.key;
     
-    this.prevKey = this.key;
-    
-    return count;
+    return new SkippR(result, count, newPrevKey);
   }
   
-  private void read(DataInput in, MByteSequence mbseq) throws IOException {
+  private static void read(DataInput in, MByteSequence mbseq) throws IOException {
     int len = WritableUtils.readVInt(in);
     read(in, mbseq, len);
   }
   
-  private void readValue(DataInput in, MByteSequence mbseq) throws IOException {
+  private static void readValue(DataInput in, MByteSequence mbseq) throws IOException {
     int len = in.readInt();
     read(in, mbseq, len);
   }
@@ -391,16 +500,49 @@ public class RelativeKey implements Writ
     return ret;
   }
 
-  private void read(DataInput in, MByteSequence mbseq, int len) throws IOException {
-    if (mbseq.getBackingArray().length < len) {
-      mbseq.setArray(new byte[nextArraySize(len)]);
+  private static void read(DataInput in, MByteSequence mbseqDestination, int len) throws IOException {
+    if (mbseqDestination.getBackingArray().length < len) {
+      mbseqDestination.setArray(new byte[nextArraySize(len)]);
+    }
+    
+    in.readFully(mbseqDestination.getBackingArray(), 0, len);
+    mbseqDestination.setLength(len);
+  }
+  
+  private static byte[] readPrefix(DataInput in, ByteSequence prefixSource) throws IOException {
+    int prefixLen = WritableUtils.readVInt(in);
+    int remainingLen = WritableUtils.readVInt(in);
+    byte[] data = new byte[prefixLen + remainingLen];
+    if (prefixSource.isBackedByArray()) {
+      System.arraycopy(prefixSource.getBackingArray(), prefixSource.offset(), data, 0, prefixLen);
+    } else {
+      byte[] prefixArray = prefixSource.toArray();
+      System.arraycopy(prefixArray, 0, data, 0, prefixLen);
+    }
+    // read remaining
+    in.readFully(data, prefixLen, remainingLen);
+    return data;
     }
     
-    in.readFully(mbseq.getBackingArray(), 0, len);
-    mbseq.setLength(len);
+  private static void readPrefix(DataInput in, MByteSequence dest, ByteSequence prefixSource) throws IOException {
+    int prefixLen = WritableUtils.readVInt(in);
+    int remainingLen = WritableUtils.readVInt(in);
+    int len = prefixLen + remainingLen;
+    if (dest.getBackingArray().length < len) {
+      dest.setArray(new byte[nextArraySize(len)]);
+    }
+    if (prefixSource.isBackedByArray()) {
+      System.arraycopy(prefixSource.getBackingArray(), prefixSource.offset(), dest.getBackingArray(), 0, prefixLen);
+    } else {
+      byte[] prefixArray = prefixSource.toArray();
+      System.arraycopy(prefixArray, 0, dest.getBackingArray(), 0, prefixLen);
+    }
+    // read remaining
+    in.readFully(dest.getBackingArray(), prefixLen, remainingLen);
+    dest.setLength(len);
   }
   
-  private byte[] read(DataInput in) throws IOException {
+  private static byte[] read(DataInput in) throws IOException {
     int len = WritableUtils.readVInt(in);
     byte[] data = new byte[len];
     in.readFully(data);
@@ -411,52 +553,75 @@ public class RelativeKey implements Writ
     return key;
   }
   
-  private void write(DataOutput out, ByteSequence bs) throws IOException {
+  private static void write(DataOutput out, ByteSequence bs) throws IOException {
     WritableUtils.writeVInt(out, bs.length());
     out.write(bs.getBackingArray(), bs.offset(), bs.length());
   }
   
+  private static void writePrefix(DataOutput out, ByteSequence bs, int commonPrefixLength) throws IOException {
+    WritableUtils.writeVInt(out, commonPrefixLength);
+    WritableUtils.writeVInt(out, bs.length() - commonPrefixLength);
+    out.write(bs.getBackingArray(), bs.offset() + commonPrefixLength, bs.length() - commonPrefixLength);
+  }
+  
   @Override
   public void write(DataOutput out) throws IOException {
     
     out.writeByte(fieldsSame);
     
-    // System.out.printf("wrote fs %x%n", fieldsSame);
-    
-    bytesWritten += 1;
+    if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED) {
+      out.write(fieldsPrefixed);
+    }
     
-    if ((fieldsSame & ROW_SAME) == 0) {
+    if ((fieldsSame & ROW_SAME) == ROW_SAME) {
+      // same, write nothing
+    } else if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) {
+      // similar, write what's common
+      writePrefix(out, key.getRowData(), rowCommonPrefixLen);
+    } else {
+      // write it all
       write(out, key.getRowData());
     }
     
-    if ((fieldsSame & CF_SAME) == 0) {
+    if ((fieldsSame & CF_SAME) == CF_SAME) {
+      // same, write nothing
+    } else if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) {
+      // similar, write what's common
+      writePrefix(out, key.getColumnFamilyData(), cfCommonPrefixLen);
+    } else {
+      // write it all
       write(out, key.getColumnFamilyData());
     }
     
-    if ((fieldsSame & CQ_SAME) == 0) {
-      
+    if ((fieldsSame & CQ_SAME) == CQ_SAME) {
+      // same, write nothing
+    } else if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) {
+      // similar, write what's common
+      writePrefix(out, key.getColumnQualifierData(), cqCommonPrefixLen);
+    } else {
+      // write it all
       write(out, key.getColumnQualifierData());
-      
-      /*
-       * Integer id = colFams.get(key.getColumnQualifier()); if(id == null){ id = nextId++; colFams.put(key.getColumnQualifier(), id); }
-       * 
-       * WritableUtils.writeVInt(out, id); bytesWritten += 1;
-       */
-      
     }
     
-    if ((fieldsSame & CV_SAME) == 0) {
+    if ((fieldsSame & CV_SAME) == CV_SAME) {
+      // same, write nothing
+    } else if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) {
+      // similar, write what's common
+      writePrefix(out, key.getColumnVisibilityData(), cvCommonPrefixLen);
+    } else {
+      // write it all
       write(out, key.getColumnVisibilityData());
     }
     
-    if ((fieldsSame & TS_SAME) == 0) {
+    if ((fieldsSame & TS_SAME) == TS_SAME) {
+      // same, write nothing
+    } else if ((fieldsPrefixed & TS_DIFF) == TS_DIFF) {
+      // similar, write what's common
+      WritableUtils.writeVLong(out, tsDiff);
+    } else {
+      // write it all
       WritableUtils.writeVLong(out, key.getTimestamp());
     }
   }
   
-  @Override
-  public int compareTo(RelativeKey o) {
-    throw new UnsupportedOperationException();
-  }
-  
 }

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java Tue Nov 20 17:00:21 2012
@@ -75,7 +75,7 @@ public class MultiLevelIndexTest extends
     FSDataInputStream in = new FSDataInputStream(bais);
     CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance());
     
-    Reader reader = new Reader(_cbr, RFile.RINDEX_VER_6);
+    Reader reader = new Reader(_cbr, RFile.RINDEX_VER_7);
     BlockRead rootIn = _cbr.getMetaBlock("root");
     reader.readFields(rootIn);
     rootIn.close();

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java Tue Nov 20 17:00:21 2012
@@ -16,6 +16,10 @@
  */
 package org.apache.accumulo.core.file.rfile;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -29,8 +33,6 @@ import java.util.Iterator;
 import java.util.Random;
 import java.util.Set;
 
-import junit.framework.TestCase;
-
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
@@ -53,8 +55,9 @@ import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.junit.Test;
 
-public class RFileTest extends TestCase {
+public class RFileTest {
   
   private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
   
@@ -200,18 +203,19 @@ public class RFileTest extends TestCase 
     }
   }
   
-  private Key nk(String row, String cf, String cq, String cv, long ts) {
+  static Key nk(String row, String cf, String cq, String cv, long ts) {
     return new Key(row.getBytes(), cf.getBytes(), cq.getBytes(), cv.getBytes(), ts);
   }
   
-  private Value nv(String val) {
+  static Value nv(String val) {
     return new Value(val.getBytes());
   }
   
-  private String nf(String prefix, int i) {
+  static String nf(String prefix, int i) {
     return String.format(prefix + "%06d", i);
   }
   
+  @Test
   public void test1() throws IOException {
     
     // test an emprt file
@@ -230,6 +234,7 @@ public class RFileTest extends TestCase 
     trf.closeReader();
   }
   
+  @Test
   public void test2() throws IOException {
     
     // test an rfile with one entry
@@ -266,6 +271,7 @@ public class RFileTest extends TestCase 
     trf.closeReader();
   }
   
+  @Test
   public void test3() throws IOException {
     
     // test an rfile with multiple rows having multiple columns
@@ -423,6 +429,7 @@ public class RFileTest extends TestCase 
     assertFalse(evi.hasNext());
   }
   
+  @Test
   public void test4() throws IOException {
     TestRFile trf = new TestRFile();
     
@@ -465,6 +472,7 @@ public class RFileTest extends TestCase 
     }
   }
   
+  @Test
   public void test5() throws IOException {
     
     TestRFile trf = new TestRFile();
@@ -493,6 +501,7 @@ public class RFileTest extends TestCase 
     trf.closeReader();
   }
   
+  @Test
   public void test6() throws IOException {
     
     TestRFile trf = new TestRFile();
@@ -525,6 +534,7 @@ public class RFileTest extends TestCase 
     trf.closeReader();
   }
   
+  @Test
   public void test7() throws IOException {
     // these test excercise setting the end key of a range
     
@@ -576,6 +586,7 @@ public class RFileTest extends TestCase 
     trf.reader.close();
   }
   
+  @Test
   public void test8() throws IOException {
     TestRFile trf = new TestRFile();
     
@@ -692,6 +703,7 @@ public class RFileTest extends TestCase 
     return cfs;
   }
   
+  @Test
   public void test9() throws IOException {
     TestRFile trf = new TestRFile();
     
@@ -833,6 +845,7 @@ public class RFileTest extends TestCase 
     
   }
   
+  @Test
   public void test10() throws IOException {
     
     // test empty locality groups
@@ -961,6 +974,7 @@ public class RFileTest extends TestCase 
     trf.closeReader();
   }
   
+  @Test
   public void test11() throws IOException {
     // test locality groups with more than two entries
     
@@ -1065,6 +1079,7 @@ public class RFileTest extends TestCase 
     trf.closeReader();
   }
   
+  @Test
   public void test12() throws IOException {
     // test inserting column fams not in locality groups
     
@@ -1096,6 +1111,7 @@ public class RFileTest extends TestCase 
     
   }
   
+  @Test
   public void test13() throws IOException {
     // test inserting column fam in default loc group that was in
     // previous locality group
@@ -1137,6 +1153,7 @@ public class RFileTest extends TestCase 
     
   }
   
+  @Test
   public void test14() throws IOException {
     // test starting locality group after default locality group was started
     
@@ -1162,6 +1179,7 @@ public class RFileTest extends TestCase 
     trf.writer.close();
   }
   
+  @Test
   public void test16() throws IOException {
     TestRFile trf = new TestRFile();
     
@@ -1180,6 +1198,7 @@ public class RFileTest extends TestCase 
     trf.closeWriter();
   }
   
+  @Test
   public void test17() throws IOException {
     // add alot of the same keys to rfile that cover multiple blocks...
     // this should cause the keys in the index to be exactly the same...
@@ -1318,6 +1337,7 @@ public class RFileTest extends TestCase 
     assertEquals(nonExcluded, colFamsSeen);
   }
   
+  @Test
   public void test18() throws IOException {
     // test writing more column families to default LG than it will track
     
@@ -1369,6 +1389,7 @@ public class RFileTest extends TestCase 
     trf.closeReader();
   }
   
+  @Test
   public void test19() throws IOException {
     // test RFile metastore
     TestRFile trf = new TestRFile();
@@ -1421,9 +1442,16 @@ public class RFileTest extends TestCase 
     trf.closeReader();
   }
   
+  @Test(expected = NullPointerException.class)
+  public void testMissingUnreleasedVersions() throws Exception {
+    runVersionTest(5);
+  }
+  
+  @Test
   public void testOldVersions() throws Exception {
     runVersionTest(3);
     runVersionTest(4);
+    runVersionTest(6);
   }
   
   private void runVersionTest(int version) throws IOException {

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java Tue Nov 20 17:00:21 2012
@@ -16,13 +16,29 @@
  */
 package org.apache.accumulo.core.file.rfile;
 
-import junit.framework.TestCase;
+import static org.junit.Assert.assertEquals;
 
-/**
- * 
- */
-public class RelativeKeyTest extends TestCase {
-  public void test1() {
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RelativeKeyTest {
+  
+  @Test
+  public void testBasicRelativeKey() {
     assertEquals(1, RelativeKey.nextArraySize(0));
     assertEquals(1, RelativeKey.nextArraySize(1));
     assertEquals(2, RelativeKey.nextArraySize(2));
@@ -44,4 +60,203 @@ public class RelativeKeyTest extends Tes
     assertEquals(Integer.MAX_VALUE, RelativeKey.nextArraySize(Integer.MAX_VALUE));
   }
   
+  @Test
+  public void testCommonPrefix() {
+    // exact matches
+    ArrayByteSequence exact = new ArrayByteSequence("abc");
+    assertEquals(-1, RelativeKey.getCommonPrefix(exact, exact));
+    assertEquals(-1, commonPrefixHelper("", ""));
+    assertEquals(-1, commonPrefixHelper("a", "a"));
+    assertEquals(-1, commonPrefixHelper("aa", "aa"));
+    assertEquals(-1, commonPrefixHelper("aaa", "aaa"));
+    assertEquals(-1, commonPrefixHelper("abab", "abab"));
+    assertEquals(-1, commonPrefixHelper(new String("aaa"), new ArrayByteSequence("aaa").toString()));
+    assertEquals(-1, commonPrefixHelper("abababababab".substring(3, 6), "ccababababcc".substring(3, 6)));
+    
+    // no common prefix
+    assertEquals(0, commonPrefixHelper("", "a"));
+    assertEquals(0, commonPrefixHelper("a", ""));
+    assertEquals(0, commonPrefixHelper("a", "b"));
+    assertEquals(0, commonPrefixHelper("aaaa", "bbbb"));
+    
+    // some common prefix
+    assertEquals(1, commonPrefixHelper("a", "ab"));
+    assertEquals(1, commonPrefixHelper("ab", "ac"));
+    assertEquals(1, commonPrefixHelper("ab", "ac"));
+    assertEquals(2, commonPrefixHelper("aa", "aaaa"));
+    assertEquals(4, commonPrefixHelper("aaaaa", "aaaab"));
+  }
+  
+  private int commonPrefixHelper(String a, String b) {
+    return RelativeKey.getCommonPrefix(new ArrayByteSequence(a), new ArrayByteSequence(b));
+  }
+  
+  @Test
+  public void testReadWritePrefix() throws IOException {
+    Key prevKey = new Key("row1", "columnfamily1", "columnqualifier1", "columnvisibility1", 1000);
+    Key newKey = new Key("row2", "columnfamily2", "columnqualifier2", "columnvisibility2", 3000);
+    RelativeKey expected = new RelativeKey(prevKey, newKey);
+    
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(baos);
+    expected.write(out);
+    
+    RelativeKey actual = new RelativeKey();
+    actual.setPrevKey(prevKey);
+    actual.readFields(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
+    
+    assertEquals(expected.getKey(), actual.getKey());
+  }
+  
+  private static ArrayList<Key> expectedKeys;
+  private static ArrayList<Value> expectedValues;
+  private static ArrayList<Integer> expectedPositions;
+  private static ByteArrayOutputStream baos;
+  
+  @BeforeClass
+  public static void initSource() throws IOException {
+    int initialListSize = 10000;
+    
+    baos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(baos);
+    
+    expectedKeys = new ArrayList<Key>(initialListSize);
+    expectedValues = new ArrayList<Value>(initialListSize);
+    expectedPositions = new ArrayList<Integer>(initialListSize);
+    
+    Key prev = null;
+    int val = 0;
+    for (int row = 0; row < 4; row++) {
+      String rowS = RFileTest.nf("r_", row);
+      for (int cf = 0; cf < 4; cf++) {
+        String cfS = RFileTest.nf("cf_", cf);
+        for (int cq = 0; cq < 4; cq++) {
+          String cqS = RFileTest.nf("cq_", cq);
+          for (int cv = 'A'; cv < 'A' + 4; cv++) {
+            String cvS = "" + (char) cv;
+            for (int ts = 4; ts > 0; ts--) {
+              Key k = RFileTest.nk(rowS, cfS, cqS, cvS, ts);
+              k.setDeleted(true);
+              Value v = RFileTest.nv("" + val);
+              expectedPositions.add(out.size());
+              new RelativeKey(prev, k).write(out);
+              prev = k;
+              v.write(out);
+              expectedKeys.add(k);
+              expectedValues.add(v);
+              
+              k = RFileTest.nk(rowS, cfS, cqS, cvS, ts);
+              v = RFileTest.nv("" + val);
+              expectedPositions.add(out.size());
+              new RelativeKey(prev, k).write(out);
+              prev = k;
+              v.write(out);
+              expectedKeys.add(k);
+              expectedValues.add(v);
+              
+              val++;
+            }
+          }
+        }
+      }
+    }
+  }
+  
+  private DataInputStream in;
+  
+  @Before
+  public void setupDataInputStream() {
+    in = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    in.mark(0);
+  }
+  
+  @Test
+  public void testSeekBeforeEverything() throws IOException {
+    Key seekKey = new Key();
+    Key prevKey = new Key();
+    Key currKey = null;
+    MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+    
+    RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+    assertEquals(1, skippr.skipped);
+    assertEquals(new Key(), skippr.prevKey);
+    assertEquals(expectedKeys.get(0), skippr.rk.getKey());
+    assertEquals(expectedValues.get(0).toString(), value.toString());
+    
+    // ensure we can advance after fastskip
+    skippr.rk.readFields(in);
+    assertEquals(expectedKeys.get(1), skippr.rk.getKey());
+    
+    in.reset();
+    
+    seekKey = new Key("a", "b", "c", "d", 1);
+    seekKey.setDeleted(true);
+    skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+    assertEquals(1, skippr.skipped);
+    assertEquals(new Key(), skippr.prevKey);
+    assertEquals(expectedKeys.get(0), skippr.rk.getKey());
+    assertEquals(expectedValues.get(0).toString(), value.toString());
+    
+    skippr.rk.readFields(in);
+    assertEquals(expectedKeys.get(1), skippr.rk.getKey());
+  }
+  
+  @Test(expected = EOFException.class)
+  public void testSeekAfterEverything() throws IOException {
+    Key seekKey = new Key("s", "t", "u", "v", 1);
+    Key prevKey = new Key();
+    Key currKey = null;
+    MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+    
+    RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+  }
+  
+  @Test
+  public void testSeekMiddle() throws IOException {
+    int seekIndex = expectedKeys.size() / 2;
+    Key seekKey = expectedKeys.get(seekIndex);
+    Key prevKey = new Key();
+    Key currKey = null;
+    MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+    
+    RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+    
+    assertEquals(seekIndex + 1, skippr.skipped);
+    assertEquals(expectedKeys.get(seekIndex - 1), skippr.prevKey);
+    assertEquals(expectedKeys.get(seekIndex), skippr.rk.getKey());
+    assertEquals(expectedValues.get(seekIndex).toString(), value.toString());
+    
+    skippr.rk.readFields(in);
+    assertEquals(expectedValues.get(seekIndex + 1).toString(), value.toString());
+    
+    // try fast skipping to a key that does not exist
+    in.reset();
+    Key fKey = expectedKeys.get(seekIndex).followingKey(PartialKey.ROW_COLFAM_COLQUAL);
+    int i;
+    for (i = seekIndex; expectedKeys.get(i).compareTo(fKey) < 0; i++) {}
+    
+    skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, prevKey, currKey);
+    assertEquals(i + 1, skippr.skipped);
+    assertEquals(expectedKeys.get(i - 1), skippr.prevKey);
+    assertEquals(expectedKeys.get(i), skippr.rk.getKey());
+    assertEquals(expectedValues.get(i).toString(), value.toString());
+    
+    // try fast skipping to our current location
+    skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, expectedKeys.get(i - 1), expectedKeys.get(i));
+    assertEquals(0, skippr.skipped);
+    assertEquals(expectedKeys.get(i - 1), skippr.prevKey);
+    assertEquals(expectedKeys.get(i), skippr.rk.getKey());
+    assertEquals(expectedValues.get(i).toString(), value.toString());
+    
+    // try fast skipping 1 column family ahead from our current location, testing fastskip from middle of block as opposed to stating at beginning of block
+    fKey = expectedKeys.get(i).followingKey(PartialKey.ROW_COLFAM);
+    int j;
+    for (j = i; expectedKeys.get(j).compareTo(fKey) < 0; j++) {}
+    skippr = RelativeKey.fastSkip(in, fKey, value, expectedKeys.get(i - 1), expectedKeys.get(i));
+    assertEquals(j - i, skippr.skipped);
+    assertEquals(expectedKeys.get(j - 1), skippr.prevKey);
+    assertEquals(expectedKeys.get(j), skippr.rk.getKey());
+    assertEquals(expectedValues.get(j).toString(), value.toString());
+    
+  }
 }



Mime
View raw message