Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 77445D4D3 for ; Tue, 20 Nov 2012 17:00:52 +0000 (UTC) Received: (qmail 13312 invoked by uid 500); 20 Nov 2012 17:00:52 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 13176 invoked by uid 500); 20 Nov 2012 17:00:48 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 13110 invoked by uid 99); 20 Nov 2012 17:00:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Nov 2012 17:00:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Nov 2012 17:00:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3E026238896F for ; Tue, 20 Nov 2012 17:00:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1411745 - in /accumulo/trunk: ./ core/src/main/java/org/apache/accumulo/core/file/rfile/ core/src/test/java/org/apache/accumulo/core/file/rfile/ Date: Tue, 20 Nov 2012 17:00:22 -0000 To: commits@accumulo.apache.org From: ctubbsii@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121120170023.3E026238896F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ctubbsii Date: Tue Nov 20 17:00:21 2012 New Revision: 1411745 URL: http://svn.apache.org/viewvc?rev=1411745&view=rev Log: ACCUMULO-790 Added prefix compression for relative key files, per key component Modified: accumulo/trunk/.gitignore accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java Modified: accumulo/trunk/.gitignore URL: http://svn.apache.org/viewvc/accumulo/trunk/.gitignore?rev=1411745&r1=1411744&r2=1411745&view=diff ============================================================================== --- accumulo/trunk/.gitignore (original) +++ accumulo/trunk/.gitignore Tue Nov 20 17:00:21 2012 @@ -83,6 +83,7 @@ /core/.classpath # /examples/ +/examples/lib /examples/target /examples/.settings /examples/.classpath @@ -125,6 +126,12 @@ /trace/.project /trace/.settings +# /test/ +/test/target +/test/.classpath +/test/.project +/test/.settings + # /test/system/auto/ /test/system/auto/*.pyc /test/system/auto/fake_disk_failure.so Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java?rev=1411745&r1=1411744&r2=1411745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java Tue Nov 20 17:00:21 2012 @@ -58,13 +58,15 @@ public class BlockIndex { public static class BlockIndexEntry implements Comparable { private Key key; + private Key prevKey; private int entriesLeft; private int pos; - public BlockIndexEntry(int pos, int entriesLeft, Key key) { + public BlockIndexEntry(int pos, int entriesLeft, Key key, Key prevKey) { this.pos = pos; this.entriesLeft = entriesLeft; this.key = key; + this.prevKey = prevKey; } /** @@ -87,14 +89,18 @@ public class BlockIndex { return key.compareTo(o.key); } + @Override public String toString() { return key + " " + entriesLeft + " " + pos; } + + public Key getPrevKey() { + return prevKey; + } } public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) { - // get a local ref to the index, another thread could change it BlockIndexEntry[] blockIndex = this.blockIndex; @@ -150,12 +156,13 @@ public class BlockIndex { while (count < (indexEntry.getNumEntries() - interval + 1)) { + Key myPrevKey = rk.getKey(); int pos = cacheBlock.getPosition(); rk.readFields(cacheBlock); val.readFields(cacheBlock); if (count > 0 && count % interval == 0) { - index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, rk.getKey())); + index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, rk.getKey(), myPrevKey)); } count++; Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java?rev=1411745&r1=1411744&r2=1411745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java Tue Nov 20 17:00:21 2012 @@ -252,7 +252,7 @@ public class MultiLevelIndex { public void readFields(DataInput in, int version) throws IOException { - if (version == RFile.RINDEX_VER_6) { + if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) { level = in.readInt(); offset = in.readInt(); hasNext = in.readBoolean(); @@ -723,7 +723,7 @@ public class MultiLevelIndex { size = 0; - if (version == RFile.RINDEX_VER_6) { + if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) { size = in.readInt(); } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java?rev=1411745&r1=1411744&r2=1411745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java Tue Nov 20 17:00:21 2012 @@ -56,6 +56,7 @@ import org.apache.accumulo.core.file.rfi import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry; import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator; import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence; +import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR; import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist; import org.apache.accumulo.core.iterators.IterationInterruptedException; import org.apache.accumulo.core.iterators.IteratorEnvironment; @@ -78,7 +79,9 @@ public class RFile { private RFile() {} private static final int RINDEX_MAGIC = 0x20637474; + static final int RINDEX_VER_7 = 7; static final int RINDEX_VER_6 = 6; + // static final int RINDEX_VER_5 = 5; // unreleased static final int RINDEX_VER_4 = 4; static final int RINDEX_VER_3 = 3; @@ -327,6 +330,7 @@ public class RFile { previousColumnFamilies = new HashSet(); } + @Override public synchronized void close() throws IOException { if (closed) { @@ -338,7 +342,7 @@ public class RFile { ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index"); mba.writeInt(RINDEX_MAGIC); - mba.writeInt(RINDEX_VER_6); + mba.writeInt(RINDEX_VER_7); if (currentLocalityGroup != null) localityGroups.add(currentLocalityGroup); @@ -369,6 +373,7 @@ public class RFile { } } + @Override public void append(Key key, Value value) throws IOException { if (dataClosed) { @@ -685,14 +690,12 @@ public class RFile { // and speed up others. MByteSequence valbs = new MByteSequence(new byte[64], 0, 0); - RelativeKey tmpRk = new RelativeKey(); - Key pKey = new Key(getTopKey()); - int fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, pKey, getTopKey()); - if (fastSkipped > 0) { - entriesLeft -= fastSkipped; + SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey()); + if (skippr.skipped > 0) { + entriesLeft -= skippr.skipped; val = new Value(valbs.toArray()); - prevKey = pKey; - rk = tmpRk; + prevKey = skippr.prevKey; + rk = skippr.rk; } reseek = false; @@ -705,8 +708,6 @@ public class RFile { } } - int fastSkipped = -1; - if (reseek) { iiter = index.lookup(startKey); @@ -735,7 +736,6 @@ public class RFile { if (!checkRange) hasTop = true; - RelativeKey tmpRk = new RelativeKey(); MByteSequence valbs = new MByteSequence(new byte[64], 0, 0); Key currKey = null; @@ -747,7 +747,8 @@ public class RFile { if (bie != null) { // we are seeked to the current position of the key in the index // need to prime the read process and read this key from the block - tmpRk.setPrevKey(bie.getKey()); + RelativeKey tmpRk = new RelativeKey(); + tmpRk.setPrevKey(bie.getPrevKey()); tmpRk.readFields(currBlock); val = new Value(); @@ -756,18 +757,19 @@ public class RFile { // just consumed one key from the input stream, so subtract one from entries left entriesLeft = bie.getEntriesLeft() - 1; - prevKey = new Key(bie.getKey()); + prevKey = new Key(bie.getPrevKey()); currKey = bie.getKey(); } } } - fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, prevKey, currKey); - entriesLeft -= fastSkipped; + SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, currKey); + prevKey = skippr.prevKey; + entriesLeft -= skippr.skipped; val = new Value(valbs.toArray()); // set rk when everything above is successful, if exception // occurs rk will not be set - rk = tmpRk; + rk = skippr.rk; } } @@ -842,7 +844,7 @@ public class RFile { if (magic != RINDEX_MAGIC) throw new IOException("Did not see expected magic number, saw " + magic); - if (ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3) + if (ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3) throw new IOException("Did not see expected version, saw " + ver); int size = mb.readInt(); Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java?rev=1411745&r1=1411744&r2=1411745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java Tue Nov 20 17:00:21 2012 @@ -16,94 +16,115 @@ */ package org.apache.accumulo.core.file.rfile; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; -import java.util.HashMap; -import java.util.Map.Entry; -import java.util.Set; -import java.util.zip.GZIPOutputStream; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -public class RelativeKey implements WritableComparable { +public class RelativeKey implements Writable { - private Key key; - - private byte fieldsSame; + private static final byte BIT = 0x01; + private Key key; private Key prevKey; - private static final byte ROW_SAME = 0x01; - private static final byte CF_SAME = 0x02; - private static final byte CQ_SAME = 0x04; - private static final byte CV_SAME = 0x08; - private static final byte TS_SAME = 0x10; - private static final byte DELETED = 0x20; - - private static HashMap colFams = new HashMap(); - - private static long bytesWritten = 0; + private byte fieldsSame; + private byte fieldsPrefixed; - public static void printStats() throws Exception { - System.out.println("colFams.size() : " + colFams.size()); - Set> es = colFams.entrySet(); - - int sum = 0; - - for (Entry entry : es) { - sum += entry.getKey().getLength(); - } - - System.out.println("Total Column name bytes : " + sum); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(baos)); - for (Entry entry : es) { - entry.getKey().write(dos); - dos.writeInt(entry.getValue()); - } - - dos.close(); - - System.out.println("Compressed column map size : " + baos.toByteArray().length); - System.out.printf("Bytes written : %,d%n", bytesWritten); - - } + // Exact match compression options (first byte) and flag for further + private static final byte ROW_SAME = BIT << 0; + private static final byte CF_SAME = BIT << 1; + private static final byte CQ_SAME = BIT << 2; + private static final byte CV_SAME = BIT << 3; + private static final byte TS_SAME = BIT << 4; + private static final byte DELETED = BIT << 5; + // private static final byte UNUSED_1_6 = BIT << 6; + private static final byte PREFIX_COMPRESSION_ENABLED = (byte) (BIT << 7); + + // Prefix compression (second byte) + private static final byte ROW_COMMON_PREFIX = BIT << 0; + private static final byte CF_COMMON_PREFIX = BIT << 1; + private static final byte CQ_COMMON_PREFIX = BIT << 2; + private static final byte CV_COMMON_PREFIX = BIT << 3; + private static final byte TS_DIFF = BIT << 4; + + // private static final byte UNUSED_2_5 = BIT << 5; + // private static final byte UNUSED_2_6 = BIT << 6; + // private static final byte UNUSED_2_7 = (byte) (BIT << 7); + + // Values for prefix compression + int rowCommonPrefixLen; + int cfCommonPrefixLen; + int cqCommonPrefixLen; + int cvCommonPrefixLen; + long tsDiff; + /** + * This constructor is used when one needs to read from an input stream + */ public RelativeKey() { } + /** + * This constructor is used when constructing a key for writing to an output stream + */ public RelativeKey(Key prevKey, Key key) { this.key = key; fieldsSame = 0; + fieldsPrefixed = 0; + + ByteSequence prevKeyScratch; + ByteSequence keyScratch; if (prevKey != null) { - if (prevKey.getRowData().equals(key.getRowData())) + + prevKeyScratch = prevKey.getRowData(); + keyScratch = key.getRowData(); + rowCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch); + if (rowCommonPrefixLen == -1) fieldsSame |= ROW_SAME; + else if (rowCommonPrefixLen > 1) + fieldsPrefixed |= ROW_COMMON_PREFIX; - if (prevKey.getColumnFamilyData().equals(key.getColumnFamilyData())) + prevKeyScratch = prevKey.getColumnFamilyData(); + keyScratch = key.getColumnFamilyData(); + cfCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch); + if (cfCommonPrefixLen == -1) fieldsSame |= CF_SAME; + else if (cfCommonPrefixLen > 1) + fieldsPrefixed |= CF_COMMON_PREFIX; - if (prevKey.getColumnQualifierData().equals(key.getColumnQualifierData())) + prevKeyScratch = prevKey.getColumnQualifierData(); + keyScratch = key.getColumnQualifierData(); + cqCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch); + if (cqCommonPrefixLen == -1) fieldsSame |= CQ_SAME; + else if (cqCommonPrefixLen > 1) + fieldsPrefixed |= CQ_COMMON_PREFIX; - if (prevKey.getColumnVisibilityData().equals(key.getColumnVisibilityData())) + prevKeyScratch = prevKey.getColumnVisibilityData(); + keyScratch = key.getColumnVisibilityData(); + cvCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch); + if (cvCommonPrefixLen == -1) fieldsSame |= CV_SAME; + else if (cvCommonPrefixLen > 1) + fieldsPrefixed |= CV_COMMON_PREFIX; - if (prevKey.getTimestamp() == key.getTimestamp()) + tsDiff = key.getTimestamp() - prevKey.getTimestamp(); + if (tsDiff == 0) fieldsSame |= TS_SAME; + else + fieldsPrefixed |= TS_DIFF; + fieldsSame |= fieldsPrefixed == 0 ? 0 : PREFIX_COMPRESSION_ENABLED; } // stored deleted information in bit vector instead of its own byte @@ -111,6 +132,31 @@ public class RelativeKey implements Writ fieldsSame |= DELETED; } + /** + * + * @return -1 (exact match) or the number of bytes in common + */ + static int getCommonPrefix(ByteSequence prev, ByteSequence cur) { + if (prev == cur) + return -1; // infinite... exact match + + int prevLen = prev.length(); + int curLen = cur.length(); + int maxChecks = Math.min(prevLen, curLen); + int common = 0; + while (common < maxChecks) { + int a = prev.byteAt(common) & 0xff; + int b = cur.byteAt(common) & 0xff; + if (a != b) + return common; + common++; + } + // no differences found + // either exact or matches the part checked, so if they are the same length, they are an exact match, + // and if not, then they have a common prefix over all the checks we've done + return prevLen == curLen ? -1 : maxChecks; + } + public void setPrevKey(Key pk) { this.prevKey = pk; } @@ -118,42 +164,56 @@ public class RelativeKey implements Writ @Override public void readFields(DataInput in) throws IOException { fieldsSame = in.readByte(); + if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED) { + fieldsPrefixed = in.readByte(); + } else { + fieldsPrefixed = 0; + } byte[] row, cf, cq, cv; long ts; - if ((fieldsSame & ROW_SAME) == 0) { - row = read(in); - } else { + if ((fieldsSame & ROW_SAME) == ROW_SAME) { row = prevKey.getRowData().toArray(); + } else if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) { + row = readPrefix(in, prevKey.getRowData()); + } else { + row = read(in); } - if ((fieldsSame & CF_SAME) == 0) { - cf = read(in); - } else { + if ((fieldsSame & CF_SAME) == CF_SAME) { cf = prevKey.getColumnFamilyData().toArray(); + } else if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) { + cf = readPrefix(in, prevKey.getColumnFamilyData()); + } else { + cf = read(in); } - if ((fieldsSame & CQ_SAME) == 0) { - cq = read(in); - } else { + if ((fieldsSame & CQ_SAME) == CQ_SAME) { cq = prevKey.getColumnQualifierData().toArray(); + } else if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) { + cq = readPrefix(in, prevKey.getColumnQualifierData()); + } else { + cq = read(in); } - if ((fieldsSame & CV_SAME) == 0) { - cv = read(in); - } else { + if ((fieldsSame & CV_SAME) == CV_SAME) { cv = prevKey.getColumnVisibilityData().toArray(); + } else if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) { + cv = readPrefix(in, prevKey.getColumnVisibilityData()); + } else { + cv = read(in); } - if ((fieldsSame & TS_SAME) == 0) { - ts = WritableUtils.readVLong(in); - } else { + if ((fieldsSame & TS_SAME) == TS_SAME) { ts = prevKey.getTimestamp(); + } else if ((fieldsPrefixed & TS_DIFF) == TS_DIFF) { + ts = WritableUtils.readVLong(in) + prevKey.getTimestamp(); + } else { + ts = WritableUtils.readVLong(in); } - this.key = new Key(row, cf, cq, cv, ts, (fieldsSame & DELETED) != 0, false); - + this.key = new Key(row, cf, cq, cv, ts, (fieldsSame & DELETED) == DELETED, false); this.prevKey = this.key; } @@ -182,10 +242,22 @@ public class RelativeKey implements Writ } } - int fastSkip(DataInput in, Key seekKey, MByteSequence value, Key pkey, Key currKey) throws IOException { + public static class SkippR { + RelativeKey rk; + int skipped; + Key prevKey; + + SkippR(RelativeKey rk, int skipped, Key prevKey) { + this.rk = rk; + this.skipped = skipped; + this.prevKey = prevKey; + } + } + + public static SkippR fastSkip(DataInput in, Key seekKey, MByteSequence value, Key prevKey, Key currKey) throws IOException { // this method assumes that fast skip is being called on a compressed block where the last key - // in the compressed block is >= seekKey... therefore this method should go passed the end of the - // compressed block... if it does, there is probably an error in the callers logic + // in the compressed block is >= seekKey... therefore this method shouldn't go past the end of the + // compressed block... if it does, there is probably an error in the caller's logic // this method mostly avoids object allocation and only does compares when the row changes @@ -204,11 +276,11 @@ public class RelativeKey implements Writ if (currKey != null) { - prow = new MByteSequence(pkey.getRowData()); - pcf = new MByteSequence(pkey.getColumnFamilyData()); - pcq = new MByteSequence(pkey.getColumnQualifierData()); - pcv = new MByteSequence(pkey.getColumnVisibilityData()); - pts = pkey.getTimestamp(); + prow = new MByteSequence(currKey.getRowData()); + pcf = new MByteSequence(currKey.getColumnFamilyData()); + pcq = new MByteSequence(currKey.getColumnQualifierData()); + pcv = new MByteSequence(currKey.getColumnVisibilityData()); + pts = currKey.getTimestamp(); row = new MByteSequence(currKey.getRowData()); cf = new MByteSequence(currKey.getColumnFamilyData()); @@ -221,15 +293,24 @@ public class RelativeKey implements Writ cqCmp = cq.compareTo(stopCQ); if (rowCmp >= 0) { - if (rowCmp > 0) - return 0; + if (rowCmp > 0) { + RelativeKey rk = new RelativeKey(); + rk.key = rk.prevKey = new Key(currKey); + return new SkippR(rk, 0, prevKey); + } if (cfCmp >= 0) { - if (cfCmp > 0) - return 0; + if (cfCmp > 0) { + RelativeKey rk = new RelativeKey(); + rk.key = rk.prevKey = new Key(currKey); + return new SkippR(rk, 0, prevKey); + } - if (cqCmp >= 0) - return 0; + if (cqCmp >= 0) { + RelativeKey rk = new RelativeKey(); + rk.key = rk.prevKey = new Key(currKey); + return new SkippR(rk, 0, prevKey); + } } } @@ -246,22 +327,31 @@ public class RelativeKey implements Writ } byte fieldsSame = -1; + byte fieldsPrefixed = 0; int count = 0; + Key newPrevKey = null; while (true) { - pdel = (fieldsSame & DELETED) != 0; + pdel = (fieldsSame & DELETED) == DELETED; fieldsSame = in.readByte(); + if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED) + fieldsPrefixed = in.readByte(); + else + fieldsPrefixed = 0; boolean changed = false; - if ((fieldsSame & ROW_SAME) == 0) { + if ((fieldsSame & ROW_SAME) != ROW_SAME) { MByteSequence tmp = prow; prow = row; row = tmp; + if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) + readPrefix(in, row, prow); + else read(in, row); // read a new row, so need to compare... @@ -269,41 +359,54 @@ public class RelativeKey implements Writ changed = true; }// else the row is the same as the last, so no need to compare - if ((fieldsSame & CF_SAME) == 0) { + if ((fieldsSame & CF_SAME) != CF_SAME) { MByteSequence tmp = pcf; pcf = cf; cf = tmp; + if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) + readPrefix(in, cf, pcf); + else read(in, cf); cfCmp = cf.compareTo(stopCF); changed = true; } - if ((fieldsSame & CQ_SAME) == 0) { + if ((fieldsSame & CQ_SAME) != CQ_SAME) { MByteSequence tmp = pcq; pcq = cq; cq = tmp; + if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) + readPrefix(in, cq, pcq); + else read(in, cq); cqCmp = cq.compareTo(stopCQ); changed = true; } - if ((fieldsSame & CV_SAME) == 0) { + if ((fieldsSame & CV_SAME) != CV_SAME) { MByteSequence tmp = pcv; pcv = cv; cv = tmp; + if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) + readPrefix(in, cv, pcv); + else read(in, cv); } - if ((fieldsSame & TS_SAME) == 0) { + if ((fieldsSame & TS_SAME) != TS_SAME) { pts = ts; + + if ((fieldsPrefixed & TS_DIFF) == TS_DIFF) + ts = WritableUtils.readVLong(in) + pts; + else ts = WritableUtils.readVLong(in); } @@ -332,33 +435,39 @@ public class RelativeKey implements Writ // when the current keys field is same as the last, then // set the prev keys field the same as the current key - trow = (fieldsSame & ROW_SAME) == 0 ? prow : row; - tcf = (fieldsSame & CF_SAME) == 0 ? pcf : cf; - tcq = (fieldsSame & CQ_SAME) == 0 ? pcq : cq; - tcv = (fieldsSame & CV_SAME) == 0 ? pcv : cv; - tts = (fieldsSame & TS_SAME) == 0 ? pts : ts; + trow = (fieldsSame & ROW_SAME) == ROW_SAME ? row : prow; + tcf = (fieldsSame & CF_SAME) == CF_SAME ? cf : pcf; + tcq = (fieldsSame & CQ_SAME) == CQ_SAME ? cq : pcq; + tcv = (fieldsSame & CV_SAME) == CV_SAME ? cv : pcv; + tts = (fieldsSame & TS_SAME) == TS_SAME ? ts : pts; - Key tmp = new Key(trow.getBackingArray(), trow.offset(), trow.length(), tcf.getBackingArray(), tcf.offset(), tcf.length(), tcq.getBackingArray(), + newPrevKey = new Key(trow.getBackingArray(), trow.offset(), trow.length(), tcf.getBackingArray(), tcf.offset(), tcf.length(), tcq.getBackingArray(), tcq.offset(), tcq.length(), tcv.getBackingArray(), tcv.offset(), tcv.length(), tts); - tmp.setDeleted(pdel); - pkey.set(tmp); + newPrevKey.setDeleted(pdel); + } else if (count == 1) { + if (currKey != null) + newPrevKey = currKey; + else + newPrevKey = prevKey; + } else { + throw new IllegalStateException(); } - this.key = new Key(row.getBackingArray(), row.offset(), row.length(), cf.getBackingArray(), cf.offset(), cf.length(), cq.getBackingArray(), cq.offset(), + RelativeKey result = new RelativeKey(); + result.key = new Key(row.getBackingArray(), row.offset(), row.length(), cf.getBackingArray(), cf.offset(), cf.length(), cq.getBackingArray(), cq.offset(), cq.length(), cv.getBackingArray(), cv.offset(), cv.length(), ts); - this.key.setDeleted((fieldsSame & DELETED) != 0); + result.key.setDeleted((fieldsSame & DELETED) != 0); + result.prevKey = result.key; - this.prevKey = this.key; - - return count; + return new SkippR(result, count, newPrevKey); } - private void read(DataInput in, MByteSequence mbseq) throws IOException { + private static void read(DataInput in, MByteSequence mbseq) throws IOException { int len = WritableUtils.readVInt(in); read(in, mbseq, len); } - private void readValue(DataInput in, MByteSequence mbseq) throws IOException { + private static void readValue(DataInput in, MByteSequence mbseq) throws IOException { int len = in.readInt(); read(in, mbseq, len); } @@ -391,16 +500,49 @@ public class RelativeKey implements Writ return ret; } - private void read(DataInput in, MByteSequence mbseq, int len) throws IOException { - if (mbseq.getBackingArray().length < len) { - mbseq.setArray(new byte[nextArraySize(len)]); + private static void read(DataInput in, MByteSequence mbseqDestination, int len) throws IOException { + if (mbseqDestination.getBackingArray().length < len) { + mbseqDestination.setArray(new byte[nextArraySize(len)]); + } + + in.readFully(mbseqDestination.getBackingArray(), 0, len); + mbseqDestination.setLength(len); + } + + private static byte[] readPrefix(DataInput in, ByteSequence prefixSource) throws IOException { + int prefixLen = WritableUtils.readVInt(in); + int remainingLen = WritableUtils.readVInt(in); + byte[] data = new byte[prefixLen + remainingLen]; + if (prefixSource.isBackedByArray()) { + System.arraycopy(prefixSource.getBackingArray(), prefixSource.offset(), data, 0, prefixLen); + } else { + byte[] prefixArray = prefixSource.toArray(); + System.arraycopy(prefixArray, 0, data, 0, prefixLen); + } + // read remaining + in.readFully(data, prefixLen, remainingLen); + return data; } - in.readFully(mbseq.getBackingArray(), 0, len); - mbseq.setLength(len); + private static void readPrefix(DataInput in, MByteSequence dest, ByteSequence prefixSource) throws IOException { + int prefixLen = WritableUtils.readVInt(in); + int remainingLen = WritableUtils.readVInt(in); + int len = prefixLen + remainingLen; + if (dest.getBackingArray().length < len) { + dest.setArray(new byte[nextArraySize(len)]); + } + if (prefixSource.isBackedByArray()) { + System.arraycopy(prefixSource.getBackingArray(), prefixSource.offset(), dest.getBackingArray(), 0, prefixLen); + } else { + byte[] prefixArray = prefixSource.toArray(); + System.arraycopy(prefixArray, 0, dest.getBackingArray(), 0, prefixLen); + } + // read remaining + in.readFully(dest.getBackingArray(), prefixLen, remainingLen); + dest.setLength(len); } - private byte[] read(DataInput in) throws IOException { + private static byte[] read(DataInput in) throws IOException { int len = WritableUtils.readVInt(in); byte[] data = new byte[len]; in.readFully(data); @@ -411,52 +553,75 @@ public class RelativeKey implements Writ return key; } - private void write(DataOutput out, ByteSequence bs) throws IOException { + private static void write(DataOutput out, ByteSequence bs) throws IOException { WritableUtils.writeVInt(out, bs.length()); out.write(bs.getBackingArray(), bs.offset(), bs.length()); } + private static void writePrefix(DataOutput out, ByteSequence bs, int commonPrefixLength) throws IOException { + WritableUtils.writeVInt(out, commonPrefixLength); + WritableUtils.writeVInt(out, bs.length() - commonPrefixLength); + out.write(bs.getBackingArray(), bs.offset() + commonPrefixLength, bs.length() - commonPrefixLength); + } + @Override public void write(DataOutput out) throws IOException { out.writeByte(fieldsSame); - // System.out.printf("wrote fs %x%n", fieldsSame); - - bytesWritten += 1; + if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED) { + out.write(fieldsPrefixed); + } - if ((fieldsSame & ROW_SAME) == 0) { + if ((fieldsSame & ROW_SAME) == ROW_SAME) { + // same, write nothing + } else if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) { + // similar, write what's common + writePrefix(out, key.getRowData(), rowCommonPrefixLen); + } else { + // write it all write(out, key.getRowData()); } - if ((fieldsSame & CF_SAME) == 0) { + if ((fieldsSame & CF_SAME) == CF_SAME) { + // same, write nothing + } else if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) { + // similar, write what's common + writePrefix(out, key.getColumnFamilyData(), cfCommonPrefixLen); + } else { + // write it all write(out, key.getColumnFamilyData()); } - if ((fieldsSame & CQ_SAME) == 0) { - + if ((fieldsSame & CQ_SAME) == CQ_SAME) { + // same, write nothing + } else if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) { + // similar, write what's common + writePrefix(out, key.getColumnQualifierData(), cqCommonPrefixLen); + } else { + // write it all write(out, key.getColumnQualifierData()); - - /* - * Integer id = colFams.get(key.getColumnQualifier()); if(id == null){ id = nextId++; colFams.put(key.getColumnQualifier(), id); } - * - * WritableUtils.writeVInt(out, id); bytesWritten += 1; - */ - } - if ((fieldsSame & CV_SAME) == 0) { + if ((fieldsSame & CV_SAME) == CV_SAME) { + // same, write nothing + } else if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) { + // similar, write what's common + writePrefix(out, key.getColumnVisibilityData(), cvCommonPrefixLen); + } else { + // write it all write(out, key.getColumnVisibilityData()); } - if ((fieldsSame & TS_SAME) == 0) { + if ((fieldsSame & TS_SAME) == TS_SAME) { + // same, write nothing + } else if ((fieldsPrefixed & TS_DIFF) == TS_DIFF) { + // similar, write what's common + WritableUtils.writeVLong(out, tsDiff); + } else { + // write it all WritableUtils.writeVLong(out, key.getTimestamp()); } } - @Override - public int compareTo(RelativeKey o) { - throw new UnsupportedOperationException(); - } - } Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java?rev=1411745&r1=1411744&r2=1411745&view=diff ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java (original) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java Tue Nov 20 17:00:21 2012 @@ -75,7 +75,7 @@ public class MultiLevelIndexTest extends FSDataInputStream in = new FSDataInputStream(bais); CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance()); - Reader reader = new Reader(_cbr, RFile.RINDEX_VER_6); + Reader reader = new Reader(_cbr, RFile.RINDEX_VER_7); BlockRead rootIn = _cbr.getMetaBlock("root"); reader.readFields(rootIn); rootIn.close(); Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java?rev=1411745&r1=1411744&r2=1411745&view=diff ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java (original) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java Tue Nov 20 17:00:21 2012 @@ -16,6 +16,10 @@ */ package org.apache.accumulo.core.file.rfile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -29,8 +33,6 @@ import java.util.Iterator; import java.util.Random; import java.util.Set; -import junit.framework.TestCase; - import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; @@ -53,8 +55,9 @@ import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.Text; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.junit.Test; -public class RFileTest extends TestCase { +public class RFileTest { private static final Collection EMPTY_COL_FAMS = new ArrayList(); @@ -200,18 +203,19 @@ public class RFileTest extends TestCase } } - private Key nk(String row, String cf, String cq, String cv, long ts) { + static Key nk(String row, String cf, String cq, String cv, long ts) { return new Key(row.getBytes(), cf.getBytes(), cq.getBytes(), cv.getBytes(), ts); } - private Value nv(String val) { + static Value nv(String val) { return new Value(val.getBytes()); } - private String nf(String prefix, int i) { + static String nf(String prefix, int i) { return String.format(prefix + "%06d", i); } + @Test public void test1() throws IOException { // test an emprt file @@ -230,6 +234,7 @@ public class RFileTest extends TestCase trf.closeReader(); } + @Test public void test2() throws IOException { // test an rfile with one entry @@ -266,6 +271,7 @@ public class RFileTest extends TestCase trf.closeReader(); } + @Test public void test3() throws IOException { // test an rfile with multiple rows having multiple columns @@ -423,6 +429,7 @@ public class RFileTest extends TestCase assertFalse(evi.hasNext()); } + @Test public void test4() throws IOException { TestRFile trf = new TestRFile(); @@ -465,6 +472,7 @@ public class RFileTest extends TestCase } } + @Test public void test5() throws IOException { TestRFile trf = new TestRFile(); @@ -493,6 +501,7 @@ public class RFileTest extends TestCase trf.closeReader(); } + @Test public void test6() throws IOException { TestRFile trf = new TestRFile(); @@ -525,6 +534,7 @@ public class RFileTest extends TestCase trf.closeReader(); } + @Test public void test7() throws IOException { // these test excercise setting the end key of a range @@ -576,6 +586,7 @@ public class RFileTest extends TestCase trf.reader.close(); } + @Test public void test8() throws IOException { TestRFile trf = new TestRFile(); @@ -692,6 +703,7 @@ public class RFileTest extends TestCase return cfs; } + @Test public void test9() throws IOException { TestRFile trf = new TestRFile(); @@ -833,6 +845,7 @@ public class RFileTest extends TestCase } + @Test public void test10() throws IOException { // test empty locality groups @@ -961,6 +974,7 @@ public class RFileTest extends TestCase trf.closeReader(); } + @Test public void test11() throws IOException { // test locality groups with more than two entries @@ -1065,6 +1079,7 @@ public class RFileTest extends TestCase trf.closeReader(); } + @Test public void test12() throws IOException { // test inserting column fams not in locality groups @@ -1096,6 +1111,7 @@ public class RFileTest extends TestCase } + @Test public void test13() throws IOException { // test inserting column fam in default loc group that was in // previous locality group @@ -1137,6 +1153,7 @@ public class RFileTest extends TestCase } + @Test public void test14() throws IOException { // test starting locality group after default locality group was started @@ -1162,6 +1179,7 @@ public class RFileTest extends TestCase trf.writer.close(); } + @Test public void test16() throws IOException { TestRFile trf = new TestRFile(); @@ -1180,6 +1198,7 @@ public class RFileTest extends TestCase trf.closeWriter(); } + @Test public void test17() throws IOException { // add alot of the same keys to rfile that cover multiple blocks... // this should cause the keys in the index to be exactly the same... @@ -1318,6 +1337,7 @@ public class RFileTest extends TestCase assertEquals(nonExcluded, colFamsSeen); } + @Test public void test18() throws IOException { // test writing more column families to default LG than it will track @@ -1369,6 +1389,7 @@ public class RFileTest extends TestCase trf.closeReader(); } + @Test public void test19() throws IOException { // test RFile metastore TestRFile trf = new TestRFile(); @@ -1421,9 +1442,16 @@ public class RFileTest extends TestCase trf.closeReader(); } + @Test(expected = NullPointerException.class) + public void testMissingUnreleasedVersions() throws Exception { + runVersionTest(5); + } + + @Test public void testOldVersions() throws Exception { runVersionTest(3); runVersionTest(4); + runVersionTest(6); } private void runVersionTest(int version) throws IOException { Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java?rev=1411745&r1=1411744&r2=1411745&view=diff ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java (original) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java Tue Nov 20 17:00:21 2012 @@ -16,13 +16,29 @@ */ package org.apache.accumulo.core.file.rfile; -import junit.framework.TestCase; +import static org.junit.Assert.assertEquals; -/** - * - */ -public class RelativeKeyTest extends TestCase { - public void test1() { +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class RelativeKeyTest { + + @Test + public void testBasicRelativeKey() { assertEquals(1, RelativeKey.nextArraySize(0)); assertEquals(1, RelativeKey.nextArraySize(1)); assertEquals(2, RelativeKey.nextArraySize(2)); @@ -44,4 +60,203 @@ public class RelativeKeyTest extends Tes assertEquals(Integer.MAX_VALUE, RelativeKey.nextArraySize(Integer.MAX_VALUE)); } + @Test + public void testCommonPrefix() { + // exact matches + ArrayByteSequence exact = new ArrayByteSequence("abc"); + assertEquals(-1, RelativeKey.getCommonPrefix(exact, exact)); + assertEquals(-1, commonPrefixHelper("", "")); + assertEquals(-1, commonPrefixHelper("a", "a")); + assertEquals(-1, commonPrefixHelper("aa", "aa")); + assertEquals(-1, commonPrefixHelper("aaa", "aaa")); + assertEquals(-1, commonPrefixHelper("abab", "abab")); + assertEquals(-1, commonPrefixHelper(new String("aaa"), new ArrayByteSequence("aaa").toString())); + assertEquals(-1, commonPrefixHelper("abababababab".substring(3, 6), "ccababababcc".substring(3, 6))); + + // no common prefix + assertEquals(0, commonPrefixHelper("", "a")); + assertEquals(0, commonPrefixHelper("a", "")); + assertEquals(0, commonPrefixHelper("a", "b")); + assertEquals(0, commonPrefixHelper("aaaa", "bbbb")); + + // some common prefix + assertEquals(1, commonPrefixHelper("a", "ab")); + assertEquals(1, commonPrefixHelper("ab", "ac")); + assertEquals(1, commonPrefixHelper("ab", "ac")); + assertEquals(2, commonPrefixHelper("aa", "aaaa")); + assertEquals(4, commonPrefixHelper("aaaaa", "aaaab")); + } + + private int commonPrefixHelper(String a, String b) { + return RelativeKey.getCommonPrefix(new ArrayByteSequence(a), new ArrayByteSequence(b)); + } + + @Test + public void testReadWritePrefix() throws IOException { + Key prevKey = new Key("row1", "columnfamily1", "columnqualifier1", "columnvisibility1", 1000); + Key newKey = new Key("row2", "columnfamily2", "columnqualifier2", "columnvisibility2", 3000); + RelativeKey expected = new RelativeKey(prevKey, newKey); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + expected.write(out); + + RelativeKey actual = new RelativeKey(); + actual.setPrevKey(prevKey); + actual.readFields(new DataInputStream(new ByteArrayInputStream(baos.toByteArray()))); + + assertEquals(expected.getKey(), actual.getKey()); + } + + private static ArrayList expectedKeys; + private static ArrayList expectedValues; + private static ArrayList expectedPositions; + private static ByteArrayOutputStream baos; + + @BeforeClass + public static void initSource() throws IOException { + int initialListSize = 10000; + + baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + + expectedKeys = new ArrayList(initialListSize); + expectedValues = new ArrayList(initialListSize); + expectedPositions = new ArrayList(initialListSize); + + Key prev = null; + int val = 0; + for (int row = 0; row < 4; row++) { + String rowS = RFileTest.nf("r_", row); + for (int cf = 0; cf < 4; cf++) { + String cfS = RFileTest.nf("cf_", cf); + for (int cq = 0; cq < 4; cq++) { + String cqS = RFileTest.nf("cq_", cq); + for (int cv = 'A'; cv < 'A' + 4; cv++) { + String cvS = "" + (char) cv; + for (int ts = 4; ts > 0; ts--) { + Key k = RFileTest.nk(rowS, cfS, cqS, cvS, ts); + k.setDeleted(true); + Value v = RFileTest.nv("" + val); + expectedPositions.add(out.size()); + new RelativeKey(prev, k).write(out); + prev = k; + v.write(out); + expectedKeys.add(k); + expectedValues.add(v); + + k = RFileTest.nk(rowS, cfS, cqS, cvS, ts); + v = RFileTest.nv("" + val); + expectedPositions.add(out.size()); + new RelativeKey(prev, k).write(out); + prev = k; + v.write(out); + expectedKeys.add(k); + expectedValues.add(v); + + val++; + } + } + } + } + } + } + + private DataInputStream in; + + @Before + public void setupDataInputStream() { + in = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + in.mark(0); + } + + @Test + public void testSeekBeforeEverything() throws IOException { + Key seekKey = new Key(); + Key prevKey = new Key(); + Key currKey = null; + MByteSequence value = new MByteSequence(new byte[64], 0, 0); + + RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey); + assertEquals(1, skippr.skipped); + assertEquals(new Key(), skippr.prevKey); + assertEquals(expectedKeys.get(0), skippr.rk.getKey()); + assertEquals(expectedValues.get(0).toString(), value.toString()); + + // ensure we can advance after fastskip + skippr.rk.readFields(in); + assertEquals(expectedKeys.get(1), skippr.rk.getKey()); + + in.reset(); + + seekKey = new Key("a", "b", "c", "d", 1); + seekKey.setDeleted(true); + skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey); + assertEquals(1, skippr.skipped); + assertEquals(new Key(), skippr.prevKey); + assertEquals(expectedKeys.get(0), skippr.rk.getKey()); + assertEquals(expectedValues.get(0).toString(), value.toString()); + + skippr.rk.readFields(in); + assertEquals(expectedKeys.get(1), skippr.rk.getKey()); + } + + @Test(expected = EOFException.class) + public void testSeekAfterEverything() throws IOException { + Key seekKey = new Key("s", "t", "u", "v", 1); + Key prevKey = new Key(); + Key currKey = null; + MByteSequence value = new MByteSequence(new byte[64], 0, 0); + + RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey); + } + + @Test + public void testSeekMiddle() throws IOException { + int seekIndex = expectedKeys.size() / 2; + Key seekKey = expectedKeys.get(seekIndex); + Key prevKey = new Key(); + Key currKey = null; + MByteSequence value = new MByteSequence(new byte[64], 0, 0); + + RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey); + + assertEquals(seekIndex + 1, skippr.skipped); + assertEquals(expectedKeys.get(seekIndex - 1), skippr.prevKey); + assertEquals(expectedKeys.get(seekIndex), skippr.rk.getKey()); + assertEquals(expectedValues.get(seekIndex).toString(), value.toString()); + + skippr.rk.readFields(in); + assertEquals(expectedValues.get(seekIndex + 1).toString(), value.toString()); + + // try fast skipping to a key that does not exist + in.reset(); + Key fKey = expectedKeys.get(seekIndex).followingKey(PartialKey.ROW_COLFAM_COLQUAL); + int i; + for (i = seekIndex; expectedKeys.get(i).compareTo(fKey) < 0; i++) {} + + skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, prevKey, currKey); + assertEquals(i + 1, skippr.skipped); + assertEquals(expectedKeys.get(i - 1), skippr.prevKey); + assertEquals(expectedKeys.get(i), skippr.rk.getKey()); + assertEquals(expectedValues.get(i).toString(), value.toString()); + + // try fast skipping to our current location + skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, expectedKeys.get(i - 1), expectedKeys.get(i)); + assertEquals(0, skippr.skipped); + assertEquals(expectedKeys.get(i - 1), skippr.prevKey); + assertEquals(expectedKeys.get(i), skippr.rk.getKey()); + assertEquals(expectedValues.get(i).toString(), value.toString()); + + // try fast skipping 1 column family ahead from our current location, testing fastskip from middle of block as opposed to stating at beginning of block + fKey = expectedKeys.get(i).followingKey(PartialKey.ROW_COLFAM); + int j; + for (j = i; expectedKeys.get(j).compareTo(fKey) < 0; j++) {} + skippr = RelativeKey.fastSkip(in, fKey, value, expectedKeys.get(i - 1), expectedKeys.get(i)); + assertEquals(j - i, skippr.skipped); + assertEquals(expectedKeys.get(j - 1), skippr.prevKey); + assertEquals(expectedKeys.get(j), skippr.rk.getKey()); + assertEquals(expectedValues.get(j).toString(), value.toString()); + + } }