accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/2] accumulo git commit: ACCUMULO-1124 Shorten keys in index and uses statistics to choose better keys
Date Fri, 27 May 2016 16:14:26 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master a15ff128b -> f3b9d1925


ACCUMULO-1124 Shorten keys in index and uses statistics to choose better keys


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1ac7b4a9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1ac7b4a9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1ac7b4a9

Branch: refs/heads/master
Commit: 1ac7b4a9ffe0ebd5e56c4bbb672eea1ba79dd688
Parents: fd37134
Author: Keith Turner <kturner@apache.org>
Authored: Fri May 27 11:54:43 2016 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Fri May 27 11:54:43 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/file/rfile/KeyShortener.java  | 138 +++++++++++++++++
 .../accumulo/core/file/rfile/PrintInfo.java     |  86 +++++++++--
 .../apache/accumulo/core/file/rfile/RFile.java  |  64 ++++++--
 .../core/file/rfile/RFileOperations.java        |   2 +-
 .../accumulo/core/file/rfile/RelativeKey.java   |   8 +-
 .../core/file/rfile/KeyShortenerTest.java       | 147 +++++++++++++++++++
 .../core/file/rfile/RFileMetricsTest.java       |   2 +-
 .../accumulo/core/file/rfile/RFileTest.java     |  67 ++++++++-
 .../core/file/rfile/RelativeKeyTest.java        |  28 +++-
 9 files changed, 499 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java
new file mode 100644
index 0000000..b039982
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.rfile;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Bytes;
+
+/*
+ * Code to shorten keys that will be placed into RFile indexes. This code attempts to find
a key thats between two keys that shorter.
+ */
+public class KeyShortener {
+
+  private static final byte[] EMPTY = new byte[0];
+  private static final byte[] B00 = new byte[] {(byte) 0x00};
+  private static final byte[] BFF = new byte[] {(byte) 0xff};
+
+  private static final Logger log = LoggerFactory.getLogger(KeyShortener.class);
+
+  private KeyShortener() {}
+
+  private static int findNonFF(ByteSequence bs, int start) {
+    for (int i = start; i < bs.length(); i++) {
+      if (bs.byteAt(i) != (byte) 0xff) {
+        return i;
+      }
+    }
+
+    return bs.length();
+  }
+
+  /*
+   * return S such that prev < S < current or null if no such sequence
+   */
+  public static ByteSequence shorten(ByteSequence prev, ByteSequence current) {
+
+    int minLen = Math.min(prev.length(), current.length());
+
+    for (int i = 0; i < minLen; i++) {
+      int pb = 0xff & prev.byteAt(i);
+      int cb = 0xff & current.byteAt(i);
+
+      int diff = cb - pb;
+
+      if (diff == 1) {
+        int newLen = findNonFF(prev, i + 1);
+        byte[] successor;
+        if (newLen < prev.length()) {
+          successor = Bytes.concat(prev.subSequence(0, newLen).toArray(), BFF);
+        } else {
+          successor = Bytes.concat(prev.subSequence(0, newLen).toArray(), B00);
+        }
+        return new ArrayByteSequence(successor);
+      } else if (diff > 1) {
+        byte[] copy = new byte[i + 1];
+        System.arraycopy(prev.subSequence(0, i + 1).toArray(), 0, copy, 0, i + 1);
+        copy[i] = (byte) ((0xff & copy[i]) + 1);
+        return new ArrayByteSequence(copy);
+      }
+    }
+
+    ArrayByteSequence successor = new ArrayByteSequence(Bytes.concat(prev.toArray(), B00));
+    if (successor.equals(current)) {
+      return null;
+    }
+
+    return successor;
+  }
+
+  /*
+   * This entire class supports an optional optimization. This code does a sanity check to
ensure the optimization code did what was intended, doing a noop if
+   * there is a bug.
+   */
+  @VisibleForTesting
+  static Key sanityCheck(Key prev, Key current, Key shortened) {
+    if (prev.compareTo(shortened) >= 0) {
+      log.warn("Bug in key shortening code, please open an issue " + prev + " >= " + shortened);
+      return prev;
+    }
+
+    if (current.compareTo(shortened) <= 0) {
+      log.warn("Bug in key shortening code, please open an issue " + current + " <= "
+ shortened);
+      return prev;
+    }
+
+    return shortened;
+  }
+
+  /*
+   * Find a key K where prev < K < current AND K is shorter. If can not find a K that
meets criteria, then returns prev.
+   */
+  public static Key shorten(Key prev, Key current) {
+    Preconditions.checkArgument(prev.compareTo(current) <= 0, "Expected key less than
or equal. " + prev + " > " + current);
+
+    if (prev.getRowData().compareTo(current.getRowData()) < 0) {
+      ByteSequence shortenedRow = shorten(prev.getRowData(), current.getRowData());
+      if (shortenedRow == null) {
+        return prev;
+      }
+      return sanityCheck(prev, current, new Key(shortenedRow.toArray(), EMPTY, EMPTY, EMPTY,
0));
+    } else if (prev.getColumnFamilyData().compareTo(current.getColumnFamilyData()) < 0)
{
+      ByteSequence shortenedFam = shorten(prev.getColumnFamilyData(), current.getColumnFamilyData());
+      if (shortenedFam == null) {
+        return prev;
+      }
+      return sanityCheck(prev, current, new Key(prev.getRowData().toArray(), shortenedFam.toArray(),
EMPTY, EMPTY, 0));
+    } else if (prev.getColumnQualifierData().compareTo(current.getColumnQualifierData())
< 0) {
+      ByteSequence shortenedQual = shorten(prev.getColumnQualifierData(), current.getColumnQualifierData());
+      if (shortenedQual == null) {
+        return prev;
+      }
+      return sanityCheck(prev, current, new Key(prev.getRowData().toArray(), prev.getColumnFamilyData().toArray(),
shortenedQual.toArray(), EMPTY, 0));
+    } else {
+      return prev;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
index cfe571c..e166557 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.commons.math.stat.descriptive.SummaryStatistics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,12 +58,52 @@ public class PrintInfo implements KeywordExecutable {
     boolean histogram = false;
     @Parameter(names = {"--useSample"}, description = "Use sample data for --dump, --vis,
--histogram options")
     boolean useSample = false;
+    @Parameter(names = {"--keyStats"}, description = "print key length statistics for index
and all data")
+    boolean keyStats = false;
     @Parameter(description = " <file> { <file> ... }")
     List<String> files = new ArrayList<String>();
     @Parameter(names = {"-c", "--config"}, variableArity = true, description = "Comma-separated
Hadoop configuration files")
     List<String> configFiles = new ArrayList<>();
   }
 
+  static class LogHistogram {
+    long countBuckets[] = new long[11];
+    long sizeBuckets[] = new long[countBuckets.length];
+    long totalSize = 0;
+
+    public void add(int size) {
+      int bucket = (int) Math.log10(size);
+      countBuckets[bucket]++;
+      sizeBuckets[bucket] += size;
+      totalSize += size;
+    }
+
+    public void print(String indent) {
+      System.out.println(indent + "Up to size      count      %-age");
+      for (int i = 1; i < countBuckets.length; i++) {
+        System.out.println(String.format("%s%11.0f : %10d %6.2f%%", indent, Math.pow(10,
i), countBuckets[i], sizeBuckets[i] * 100. / totalSize));
+      }
+    }
+  }
+
+  static class KeyStats {
+    private SummaryStatistics stats = new SummaryStatistics();
+    private LogHistogram logHistogram = new LogHistogram();
+
+    public void add(Key k) {
+      int size = k.getSize();
+      stats.addValue(size);
+      logHistogram.add(size);
+    }
+
+    public void print(String indent) {
+      logHistogram.print(indent);
+      System.out.println();
+      System.out.printf("%smin:%,11.2f max:%,11.2f avg:%,11.2f stddev:%,11.2f\n", indent,
stats.getMin(), stats.getMax(), stats.getMean(),
+          stats.getStandardDeviation());
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     new PrintInfo().execute(args);
   }
@@ -90,9 +131,10 @@ public class PrintInfo implements KeywordExecutable {
     FileSystem hadoopFs = FileSystem.get(conf);
     FileSystem localFs = FileSystem.getLocal(conf);
 
-    long countBuckets[] = new long[11];
-    long sizeBuckets[] = new long[countBuckets.length];
-    long totalSize = 0;
+    LogHistogram kvHistogram = new LogHistogram();
+
+    KeyStats dataKeyStats = new KeyStats();
+    KeyStats indexKeyStats = new KeyStats();
 
     for (String arg : opts.files) {
       Path path = new Path(arg);
@@ -119,7 +161,7 @@ public class PrintInfo implements KeywordExecutable {
 
       Map<String,ArrayList<ByteSequence>> localityGroupCF = null;
 
-      if (opts.histogram || opts.dump || opts.vis || opts.hash) {
+      if (opts.histogram || opts.dump || opts.vis || opts.hash || opts.keyStats) {
         localityGroupCF = iter.getLocalityGroupCF();
 
         FileSKVIterator dataIter;
@@ -134,6 +176,14 @@ public class PrintInfo implements KeywordExecutable {
           dataIter = iter;
         }
 
+        if (opts.keyStats) {
+          FileSKVIterator indexIter = iter.getIndex();
+          while (indexIter.hasTop()) {
+            indexKeyStats.add(indexIter.getTopKey());
+            indexIter.next();
+          }
+        }
+
         for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet())
{
 
           dataIter.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
@@ -146,30 +196,36 @@ public class PrintInfo implements KeywordExecutable {
                 return;
             }
             if (opts.histogram) {
-              long size = key.getSize() + value.getSize();
-              int bucket = (int) Math.log10(size);
-              countBuckets[bucket]++;
-              sizeBuckets[bucket] += size;
-              totalSize += size;
+              kvHistogram.add(key.getSize() + value.getSize());
+            }
+            if (opts.keyStats) {
+              dataKeyStats.add(key);
             }
             dataIter.next();
           }
         }
       }
-      System.out.println();
 
       iter.close();
 
-      if (opts.vis || opts.hash)
+      if (opts.vis || opts.hash) {
+        System.out.println();
         vmg.printMetrics(opts.hash, "Visibility", System.out);
+      }
 
       if (opts.histogram) {
-        System.out.println("Up to size      count      %-age");
-        for (int i = 1; i < countBuckets.length; i++) {
-          System.out.println(String.format("%11.0f : %10d %6.2f%%", Math.pow(10, i), countBuckets[i],
sizeBuckets[i] * 100. / totalSize));
-        }
+        System.out.println();
+        kvHistogram.print("");
       }
 
+      if (opts.keyStats) {
+        System.out.println();
+        System.out.println("Statistics for keys in data :");
+        dataKeyStats.print("\t");
+        System.out.println();
+        System.out.println("Statistics for keys in index :");
+        indexKeyStats.print("\t");
+      }
       // If the output stream has closed, there is no reason to keep going.
       if (System.out.checkError())
         return;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index a73936b..6032c7f 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -70,6 +70,7 @@ import org.apache.accumulo.core.sample.Sampler;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.util.MutableByteSequence;
 import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.math.stat.descriptive.SummaryStatistics;
 import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,6 +90,10 @@ public class RFile {
   static final int RINDEX_VER_8 = 8; // Added sample storage. There is a sample locality
group for each locality group. Sample are built using a Sampler and
                                      // sampler configuration. The Sampler and its configuration
are stored in RFile. Persisting the method of producing the
                                      // sample allows a user of RFile to determine if the
sample is useful.
+                                     //
+                                     // Selected smaller keys for index by doing two things.
First internal stats were used to look for keys that were below
+                                     // average in size for the index. Also keys that were
statistically large were excluded from the index. Second shorter keys
+                                     // (that may not exist in data) were generated for the
index.
   static final int RINDEX_VER_7 = 7; // Added support for prefix encoding and encryption.
Before this change only exact matches within a key field were deduped
                                      // for consecutive keys. After this change, if consecutive
key fields have the same prefix then the prefix is only stored
                                      // once.
@@ -375,7 +380,8 @@ public class RFile {
     private ABlockWriter blockWriter;
 
     // private BlockAppender blockAppender;
-    private long blockSize = 100000;
+    private final long blockSize;
+    private final long maxBlockSize;
     private int entries = 0;
 
     private LocalityGroupMetadata currentLocalityGroup = null;
@@ -386,13 +392,23 @@ public class RFile {
 
     private SampleLocalityGroupWriter sample;
 
-    LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, LocalityGroupMetadata
currentLocalityGroup, SampleLocalityGroupWriter sample) {
+    private SummaryStatistics keyLenStats = new SummaryStatistics();
+    private double avergageKeySize = 0;
+
+    LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, long maxBlockSize, LocalityGroupMetadata
currentLocalityGroup,
+        SampleLocalityGroupWriter sample) {
       this.fileWriter = fileWriter;
       this.blockSize = blockSize;
+      this.maxBlockSize = maxBlockSize;
       this.currentLocalityGroup = currentLocalityGroup;
       this.sample = sample;
     }
 
+    private boolean isGiantKey(Key k) {
+      // consider a key thats more than 3 standard deviations from previously seen key sizes
as giant
+      return k.getSize() > keyLenStats.getMean() + keyLenStats.getStandardDeviation()
* 3;
+    }
+
     public void append(Key key, Value value) throws IOException {
 
       if (key.compareTo(prevKey) < 0) {
@@ -412,8 +428,22 @@ public class RFile {
       if (blockWriter == null) {
         blockWriter = fileWriter.prepareDataBlock();
       } else if (blockWriter.getRawSize() > blockSize) {
-        closeBlock(prevKey, false);
-        blockWriter = fileWriter.prepareDataBlock();
+
+        // Look for a key thats short to put in the index, defining short as average or below.
+        if (avergageKeySize == 0) {
+          // use the same average for the search for a below average key for a block
+          avergageKeySize = keyLenStats.getMean();
+        }
+
+        // Possibly produce a shorter key that does not exist in data. Even if a key can
be shortened, it may not be below average.
+        Key closeKey = KeyShortener.shorten(prevKey, key);
+
+        if ((closeKey.getSize() <= avergageKeySize || blockWriter.getRawSize() > maxBlockSize)
&& !isGiantKey(closeKey)) {
+          closeBlock(closeKey, false);
+          blockWriter = fileWriter.prepareDataBlock();
+          // set average to zero so its recomputed for the next block
+          avergageKeySize = 0;
+        }
       }
 
       RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
@@ -422,6 +452,8 @@ public class RFile {
       value.write(blockWriter);
       entries++;
 
+      keyLenStats.addValue(key.getSize());
+
       prevKey = new Key(key);
       lastKeyInBlock = prevKey;
 
@@ -457,12 +489,14 @@ public class RFile {
   public static class Writer implements FileSKVWriter {
 
     public static final int MAX_CF_IN_DLG = 1000;
+    private static final double MAX_BLOCK_MULTIPLIER = 1.1;
 
     private BlockFileWriter fileWriter;
 
     // private BlockAppender blockAppender;
-    private long blockSize = 100000;
-    private int indexBlockSize;
+    private final long blockSize;
+    private final long maxBlockSize;
+    private final int indexBlockSize;
 
     private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
     private ArrayList<LocalityGroupMetadata> sampleGroups = new ArrayList<LocalityGroupMetadata>();
@@ -487,6 +521,7 @@ public class RFile {
 
     public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize, SamplerConfigurationImpl
samplerConfig, Sampler sampler) throws IOException {
       this.blockSize = blockSize;
+      this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER);
       this.indexBlockSize = indexBlockSize;
       this.fileWriter = bfw;
       previousColumnFamilies = new HashSet<ByteSequence>();
@@ -603,9 +638,9 @@ public class RFile {
 
       SampleLocalityGroupWriter sampleWriter = null;
       if (sampler != null) {
-        sampleWriter = new SampleLocalityGroupWriter(new LocalityGroupWriter(fileWriter,
blockSize, sampleLocalityGroup, null), sampler);
+        sampleWriter = new SampleLocalityGroupWriter(new LocalityGroupWriter(fileWriter,
blockSize, maxBlockSize, sampleLocalityGroup, null), sampler);
       }
-      lgWriter = new LocalityGroupWriter(fileWriter, blockSize, currentLocalityGroup, sampleWriter);
+      lgWriter = new LocalityGroupWriter(fileWriter, blockSize, maxBlockSize, currentLocalityGroup,
sampleWriter);
     }
 
     @Override
@@ -838,7 +873,7 @@ public class RFile {
           reseek = false;
         }
 
-        if (startKey.compareTo(getTopKey()) >= 0 && startKey.compareTo(iiter.peekPrevious().getKey())
<= 0) {
+        if (entriesLeft > 0 && startKey.compareTo(getTopKey()) >= 0 &&
startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) {
           // start key is within the unconsumed portion of the current block
 
           // this code intentionally does not use the index associated with a cached block
@@ -848,7 +883,7 @@ public class RFile {
           // and speed up others.
 
           MutableByteSequence valbs = new MutableByteSequence(new byte[64], 0, 0);
-          SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey());
+          SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey(),
entriesLeft);
           if (skippr.skipped > 0) {
             entriesLeft -= skippr.skipped;
             val = new Value(valbs.toArray());
@@ -859,6 +894,13 @@ public class RFile {
           reseek = false;
         }
 
+        if (entriesLeft == 0 && startKey.compareTo(getTopKey()) > 0 &&
startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) {
+          // In the empty space at the end of a block. This can occur when keys are shortened
in the index creating index entries that do not exist in the
+          // block. These shortened index entires fall between the last key in a block and
first key in the next block, but may not exist in the data.
+          // Just proceed to the next block.
+          reseek = false;
+        }
+
         if (iiter.previousIndex() == 0 && getTopKey().equals(firstKey) &&
startKey.compareTo(firstKey) <= 0) {
           // seeking before the beginning of the file, and already positioned at the first
key in the file
           // so there is nothing to do
@@ -921,7 +963,7 @@ public class RFile {
             }
           }
 
-          SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, currKey);
+          SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, currKey,
entriesLeft);
           prevKey = skippr.prevKey;
           entriesLeft -= skippr.skipped;
           val = new Value(valbs.toArray());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index c8b61b6..cc6aaa2 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -29,10 +29,10 @@ import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.accumulo.core.sample.Sampler;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
-import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
index aeba4e2..98163b1 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
@@ -230,11 +230,7 @@ public class RelativeKey implements Writable {
     }
   }
 
-  public static SkippR fastSkip(DataInput in, Key seekKey, MutableByteSequence value, Key
prevKey, Key currKey) throws IOException {
-    // this method assumes that fast skip is being called on a compressed block where the
last key
-    // in the compressed block is >= seekKey... therefore this method shouldn't go past
the end of the
-    // compressed block... if it does, there is probably an error in the caller's logic
-
+  public static SkippR fastSkip(DataInput in, Key seekKey, MutableByteSequence value, Key
prevKey, Key currKey, int entriesLeft) throws IOException {
     // this method mostly avoids object allocation and only does compares when the row changes
 
     MutableByteSequence row, cf, cq, cv;
@@ -307,7 +303,7 @@ public class RelativeKey implements Writable {
     int count = 0;
     Key newPrevKey = null;
 
-    while (true) {
+    while (count < entriesLeft) {
 
       pdel = (fieldsSame & DELETED) == DELETED;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java
b/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java
new file mode 100644
index 0000000..67ff70c
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.rfile;
+
+import org.apache.accumulo.core.data.Key;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.primitives.Bytes;
+
+public class KeyShortenerTest {
+
+  private static final byte[] E = new byte[0];
+  private static final byte[] FF = new byte[] {(byte) 0xff};
+
+  private void assertBetween(Key p, Key s, Key c) {
+    Assert.assertTrue(p.compareTo(s) < 0);
+    Assert.assertTrue(s.compareTo(c) < 0);
+  }
+
+  private void testKeys(Key prev, Key current, Key expected) {
+    Key sk = KeyShortener.shorten(prev, current);
+    assertBetween(prev, sk, current);
+  }
+
+  /**
+   * append 0xff to end of string
+   */
+  private byte[] aff(String s) {
+    return Bytes.concat(s.getBytes(), FF);
+  }
+
+  /**
+   * append 0x00 to end of string
+   */
+  private byte[] a00(String s) {
+    return Bytes.concat(s.getBytes(), new byte[] {(byte) 0x00});
+  }
+
+  private byte[] toBytes(Object o) {
+    if (o instanceof String) {
+      return ((String) o).getBytes();
+    } else if (o instanceof byte[]) {
+      return (byte[]) o;
+    }
+
+    throw new IllegalArgumentException();
+  }
+
+  private Key nk(Object row, Object fam, Object qual, long ts) {
+    return new Key(toBytes(row), toBytes(fam), toBytes(qual), E, ts);
+  }
+
+  @Test
+  public void testOneCharacterDifference() {
+    // row has char that differs by one byte
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hbhahaha", "f89222",
"q90232e"), nk(aff("r321ha"), E, E, 0));
+
+    // family has char that differs by one byte
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89322",
"q90232e"), nk("r321hahahaha", aff("f892"), E, 0));
+
+    // qualifier has char that differs by one byte
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89222",
"q91232e"), nk("r321hahahaha", "f89222", aff("q90"), 0));
+  }
+
+  @Test
+  public void testMultiCharacterDifference() {
+    // row has char that differs by two bytes
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hchahaha", "f89222",
"q90232e"), nk("r321hb", E, E, 0));
+
+    // family has char that differs by two bytes
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89422",
"q90232e"), nk("r321hahahaha", "f893", E, 0));
+
+    // qualifier has char that differs by two bytes
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89222",
"q92232e"), nk("r321hahahaha", "f89222", "q91", 0));
+  }
+
+  @Test
+  public void testOneCharacterDifferenceAndFF() {
+    byte[] ff1 = Bytes.concat(aff("mop"), "b".getBytes());
+    byte[] ff2 = Bytes.concat(aff("mop"), FF, "b".getBytes());
+
+    byte[] eff1 = Bytes.concat(aff("mop"), FF, FF);
+    byte[] eff2 = Bytes.concat(aff("mop"), FF, FF, FF);
+
+    testKeys(nk(ff1, "f89222", "q90232e", 34), new Key("mor56", "f89222", "q90232e"), nk(eff1,
E, E, 0));
+    testKeys(nk("r1", ff1, "q90232e", 34), new Key("r1", "mor56", "q90232e"), nk("r1", eff1,
E, 0));
+    testKeys(nk("r1", "f1", ff1, 34), new Key("r1", "f1", "mor56"), nk("r1", "f1", eff1,
0));
+
+    testKeys(nk(ff2, "f89222", "q90232e", 34), new Key("mor56", "f89222", "q90232e"), nk(eff2,
E, E, 0));
+    testKeys(nk("r1", ff2, "q90232e", 34), new Key("r1", "mor56", "q90232e"), nk("r1", eff2,
E, 0));
+    testKeys(nk("r1", "f1", ff2, 34), new Key("r1", "f1", "mor56"), nk("r1", "f1", eff2,
0));
+
+  }
+
+  @Test
+  public void testOneCharacterDifferenceAtEnd() {
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahahb", "f89222",
"q90232e"), nk(a00("r321hahahaha"), E, E, 0));
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89223",
"q90232e"), nk("r321hahahaha", a00("f89222"), E, 0));
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89222",
"q90232f"), nk("r321hahahaha", "f89222", a00("q90232e"), 0));
+  }
+
+  @Test
+  public void testSamePrefix() {
+    testKeys(new Key("r3boot4", "f89222", "q90232e"), new Key("r3boot452", "f89222", "q90232e"),
nk(a00("r3boot4"), E, E, 0));
+    testKeys(new Key("r3boot4", "f892", "q90232e"), new Key("r3boot4", "f89222", "q90232e"),
nk("r3boot4", a00("f892"), E, 0));
+    testKeys(new Key("r3boot4", "f89222", "q902"), new Key("r3boot4", "f89222", "q90232e"),
nk("r3boot4", "f89222", a00("q902"), 0));
+  }
+
+  @Test
+  public void testSamePrefixAnd00() {
+    Key prev = new Key("r3boot4", "f89222", "q90232e");
+    Assert.assertEquals(prev, KeyShortener.shorten(prev, nk(a00("r3boot4"), "f89222", "q90232e",
8)));
+    prev = new Key("r3boot4", "f892", "q90232e");
+    Assert.assertEquals(prev, KeyShortener.shorten(prev, nk("r3boot4", a00("f892"), "q90232e",
8)));
+    prev = new Key("r3boot4", "f89222", "q902");
+    Assert.assertEquals(prev, KeyShortener.shorten(prev, nk("r3boot4", "f89222", a00("q902"),
8)));
+  }
+
+  @Test
+  public void testSanityCheck1() {
+    // prev and shortened equal
+    Key prev = new Key("r001", "f002", "q006");
+    Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r002", "f002", "q006"),
new Key("r001", "f002", "q006")));
+    // prev > shortened equal
+    Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r003", "f002", "q006"),
new Key("r001", "f002", "q006")));
+    // current and shortened equal
+    Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r003", "f002", "q006"),
new Key("r003", "f002", "q006")));
+    // shortened > current
+    Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r003", "f002", "q006"),
new Key("r004", "f002", "q006")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
index 7f8c087..92a1d32 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
@@ -459,7 +459,7 @@ public class RFileMetricsTest {
   public void multiBlockMultiCFNonDefaultAndDefaultLocGroup() throws IOException {
     // test an rfile with multiple column families and multiple blocks in a non-default locality
group and the default locality group
 
-    trf.openWriter(false, 20);// Each entry is a block
+    trf.openWriter(false, 10);// Each entry is a block
     Set<ByteSequence> lg1 = new HashSet<>();
     lg1.add(new ArrayByteSequence("cf1"));
     lg1.add(new ArrayByteSequence("cf3"));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 5f66503..d97a4db 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -57,6 +58,7 @@ import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
@@ -86,7 +88,6 @@ import com.google.common.hash.HashCode;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.primitives.Bytes;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
 
 public class RFileTest {
 
@@ -241,7 +242,7 @@ public class RFileTest {
     }
 
     public void openWriter() throws IOException {
-      openWriter(true, 1000);
+      openWriter(1000);
     }
 
     public void openWriter(int blockSize) throws IOException {
@@ -385,6 +386,8 @@ public class RFileTest {
             String cvS = "" + (char) cv;
             for (int ts = 4; ts > 0; ts--) {
               Key k = nk(rowS, cfS, cqS, cvS, ts);
+              // check below ensures when all key sizes are same more than one index block
is created
+              Assert.assertEquals(27, k.getSize());
               k.setDeleted(true);
               Value v = nv("" + val);
               trf.writer.append(k, v);
@@ -392,6 +395,7 @@ public class RFileTest {
               expectedValues.add(v);
 
               k = nk(rowS, cfS, cqS, cvS, ts);
+              Assert.assertEquals(27, k.getSize());
               v = nv("" + val);
               trf.writer.append(k, v);
               expectedKeys.add(k);
@@ -500,6 +504,15 @@ public class RFileTest {
       }
     }
 
+    // count the number of index entries
+    FileSKVIterator iiter = trf.reader.getIndex();
+    int count = 0;
+    while (iiter.hasTop()) {
+      count++;
+      iiter.next();
+    }
+    Assert.assertEquals(20, count);
+
     trf.closeReader();
   }
 
@@ -2091,6 +2104,56 @@ public class RFileTest {
   }
 
   @Test
+  public void testBigKeys() throws IOException {
+    // this test ensures that big keys do not end up index
+    ArrayList<Key> keys = new ArrayList<>();
+
+    for (int i = 0; i < 1000; i++) {
+      String row = String.format("r%06d", i);
+      keys.add(new Key(row, "cf1", "cq1", 42));
+    }
+
+    // add a few keys with long rows
+    for (int i = 0; i < 1000; i += 100) {
+      String row = String.format("r%06d", i);
+      char ca[] = new char[1000];
+      Arrays.fill(ca, 'b');
+      row = row + new String(ca);
+      keys.add(new Key(row, "cf1", "cq1", 42));
+    }
+
+    Collections.sort(keys);
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter();
+
+    for (Key k : keys) {
+      trf.writer.append(k, new Value((k.hashCode() + "").getBytes()));
+    }
+
+    trf.writer.close();
+
+    trf.openReader();
+
+    FileSKVIterator iiter = trf.reader.getIndex();
+    while (iiter.hasTop()) {
+      Key k = iiter.getTopKey();
+      Assert.assertTrue(k + " " + k.getSize() + " >= 20", k.getSize() < 20);
+      iiter.next();
+    }
+
+    Collections.shuffle(keys);
+
+    for (Key key : keys) {
+      trf.reader.seek(new Range(key, null), EMPTY_COL_FAMS, false);
+      Assert.assertTrue(trf.reader.hasTop());
+      Assert.assertEquals(key, trf.reader.getTopKey());
+      Assert.assertEquals(new Value((key.hashCode() + "").getBytes()), trf.reader.getTopValue());
+    }
+  }
+
+  @Test
   public void testCryptoDoesntLeakSensitive() throws IOException {
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     // test an empty file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
index e413448..4ca4b6c 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
 import org.apache.accumulo.core.util.MutableByteSequence;
 import org.apache.accumulo.core.util.UnsynchronizedBuffer;
 import org.junit.Before;
@@ -178,7 +179,7 @@ public class RelativeKeyTest {
     Key currKey = null;
     MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
 
-    RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+    RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey,
expectedKeys.size());
     assertEquals(1, skippr.skipped);
     assertEquals(new Key(), skippr.prevKey);
     assertEquals(expectedKeys.get(0), skippr.rk.getKey());
@@ -192,7 +193,7 @@ public class RelativeKeyTest {
 
     seekKey = new Key("a", "b", "c", "d", 1);
     seekKey.setDeleted(true);
-    skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+    skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, expectedKeys.size());
     assertEquals(1, skippr.skipped);
     assertEquals(new Key(), skippr.prevKey);
     assertEquals(expectedKeys.get(0), skippr.rk.getKey());
@@ -203,13 +204,23 @@ public class RelativeKeyTest {
   }
 
   @Test(expected = EOFException.class)
+  public void testSeekAfterEverythingWrongCount() throws IOException {
+    Key seekKey = new Key("s", "t", "u", "v", 1);
+    Key prevKey = new Key();
+    Key currKey = null;
+    MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
+
+    RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, expectedKeys.size() + 1);
+  }
+
   public void testSeekAfterEverything() throws IOException {
     Key seekKey = new Key("s", "t", "u", "v", 1);
     Key prevKey = new Key();
     Key currKey = null;
     MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
 
-    RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+    SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, expectedKeys.size());
+    assertEquals(expectedKeys.size(), skippr.skipped);
   }
 
   @Test
@@ -220,7 +231,7 @@ public class RelativeKeyTest {
     Key currKey = null;
     MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
 
-    RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+    RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey,
expectedKeys.size());
 
     assertEquals(seekIndex + 1, skippr.skipped);
     assertEquals(expectedKeys.get(seekIndex - 1), skippr.prevKey);
@@ -236,14 +247,17 @@ public class RelativeKeyTest {
     int i;
     for (i = seekIndex; expectedKeys.get(i).compareTo(fKey) < 0; i++) {}
 
-    skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, prevKey, currKey);
+    int left = expectedKeys.size();
+
+    skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, prevKey, currKey, expectedKeys.size());
     assertEquals(i + 1, skippr.skipped);
+    left -= skippr.skipped;
     assertEquals(expectedKeys.get(i - 1), skippr.prevKey);
     assertEquals(expectedKeys.get(i), skippr.rk.getKey());
     assertEquals(expectedValues.get(i).toString(), value.toString());
 
     // try fast skipping to our current location
-    skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, expectedKeys.get(i - 1),
expectedKeys.get(i));
+    skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, expectedKeys.get(i - 1),
expectedKeys.get(i), left);
     assertEquals(0, skippr.skipped);
     assertEquals(expectedKeys.get(i - 1), skippr.prevKey);
     assertEquals(expectedKeys.get(i), skippr.rk.getKey());
@@ -253,7 +267,7 @@ public class RelativeKeyTest {
     fKey = expectedKeys.get(i).followingKey(PartialKey.ROW_COLFAM);
     int j;
     for (j = i; expectedKeys.get(j).compareTo(fKey) < 0; j++) {}
-    skippr = RelativeKey.fastSkip(in, fKey, value, expectedKeys.get(i - 1), expectedKeys.get(i));
+    skippr = RelativeKey.fastSkip(in, fKey, value, expectedKeys.get(i - 1), expectedKeys.get(i),
left);
     assertEquals(j - i, skippr.skipped);
     assertEquals(expectedKeys.get(j - 1), skippr.prevKey);
     assertEquals(expectedKeys.get(j), skippr.rk.getKey());


Mime
View raw message