crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chao...@apache.org
Subject git commit: CRUNCH-267: Fix several HFileUtils#scanHFiles related problems
Date Fri, 20 Sep 2013 04:06:56 GMT
Updated Branches:
  refs/heads/master 995f4d985 -> 013f2e19a


CRUNCH-267: Fix several HFileUtils#scanHFiles related problems


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/013f2e19
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/013f2e19
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/013f2e19

Branch: refs/heads/master
Commit: 013f2e19a7d666caca352cb3874e407e682a2436
Parents: 995f4d9
Author: Chao Shi <chaoshi@apache.org>
Authored: Fri Sep 20 11:29:32 2013 +0800
Committer: Chao Shi <chaoshi@apache.org>
Committed: Fri Sep 20 11:29:32 2013 +0800

----------------------------------------------------------------------
 .../apache/crunch/io/hbase/HFileSourceIT.java   | 73 +++++++++++++++++++
 .../crunch/io/hbase/HFileInputFormat.java       | 45 +++++++-----
 .../io/hbase/HFileOutputFormatForCrunch.java    |  4 +-
 .../org/apache/crunch/io/hbase/HFileSource.java | 38 ++++++++++
 .../org/apache/crunch/io/hbase/HFileUtils.java  | 75 +++++++++++++++++---
 5 files changed, 205 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/013f2e19/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
index 9363aba..f45bbf9 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
@@ -53,6 +53,7 @@ import static org.apache.crunch.types.writable.Writables.strings;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class HFileSourceIT implements Serializable {
@@ -81,6 +82,7 @@ public class HFileSourceIT implements Serializable {
     conf = tmpDir.getDefaultConfiguration();
   }
 
+  @Test
   public void testHFileSource() throws IOException {
     List<KeyValue> kvs = generateKeyValues(100);
     Path inputPath = tmpDir.getPath("in");
@@ -160,6 +162,42 @@ public class HFileSourceIT implements Serializable {
   }
 
   @Test
+  public void testScanHFiles_startRowIsTooSmall() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW2, FAMILY1, QUALIFIER1, 0, VALUE1),
+        new KeyValue(ROW3, FAMILY1, QUALIFIER1, 0, VALUE1));
+    Scan scan = new Scan();
+    scan.setStartRow(ROW1);
+    List<Result> results = doTestScanHFiles(kvs, scan);
+    assertEquals(2, results.size());
+    assertArrayEquals(ROW2, kvs.get(0).getRow());
+    assertArrayEquals(ROW3, kvs.get(1).getRow());
+  }
+
+  @Test
+  public void testScanHFiles_startRowIsTooLarge() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
+        new KeyValue(ROW2, FAMILY1, QUALIFIER1, 0, VALUE1));
+    Scan scan = new Scan();
+    scan.setStartRow(ROW3);
+    List<Result> results = doTestScanHFiles(kvs, scan);
+    assertEquals(0, results.size());
+  }
+
+  @Test
+  public void testScanHFiles_startRowDoesNotExist() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
+        new KeyValue(ROW3, FAMILY3, QUALIFIER3, 0, VALUE3));
+    Scan scan = new Scan();
+    scan.setStartRow(ROW2);
+    List<Result> results = doTestScanHFiles(kvs, scan);
+    assertEquals(1, results.size());
+    assertArrayEquals(ROW3, results.get(0).getRow());
+  }
+
+  @Test
   public void testScanHFiles_familyMap() throws IOException {
     List<KeyValue> kvs = ImmutableList.of(
         new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
@@ -192,6 +230,41 @@ public class HFileSourceIT implements Serializable {
     assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER2));
   }
 
+  @Test
+  public void testScanHFiles_delete() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, VALUE2),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, KeyValue.Type.Delete));
+    List<Result> results = doTestScanHFiles(kvs, new Scan());
+    assertEquals(1, results.size());
+    assertArrayEquals(VALUE1, results.get(0).getValue(FAMILY1, QUALIFIER1));
+  }
+
+  @Test
+  public void testScanHFiles_deleteColumn() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, VALUE2),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, KeyValue.Type.DeleteColumn));
+    List<Result> results = doTestScanHFiles(kvs, new Scan());
+    assertEquals(0, results.size());
+  }
+
+  @Test
+  public void testScanHFiles_deleteFamily() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER2, 2, VALUE2),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER3, 3, VALUE3),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, KeyValue.Type.DeleteFamily));
+    List<Result> results = doTestScanHFiles(kvs, new Scan());
+    assertEquals(1, results.size());
+    assertNull(results.get(0).getValue(FAMILY1, QUALIFIER1));
+    assertNull(results.get(0).getValue(FAMILY1, QUALIFIER2));
+    assertArrayEquals(VALUE3, results.get(0).getValue(FAMILY1, QUALIFIER3));
+  }
+
   private List<Result> doTestScanHFiles(List<KeyValue> kvs, Scan scan) throws
IOException {
     Path inputPath = tmpDir.getPath("in");
     writeKeyValuesToHFile(inputPath, kvs);

http://git-wip-us.apache.org/repos/asf/crunch/blob/013f2e19/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
index 07b4b15..9ced0ac 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
@@ -17,10 +17,10 @@
  */
 package org.apache.crunch.io.hbase;
 
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
@@ -62,7 +63,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue>
{
    * a more general purpose utility; it accounts for the presence of metadata files created
    * in the way we're doing exports.
    */
-  private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+  static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
     public boolean accept(Path p) {
       String name = p.getName();
       return !name.startsWith("_") && !name.startsWith(".");
@@ -86,6 +87,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue>
{
     private byte[] stopRow = null;
     private boolean reachedStopRow = false;
     private long count;
+    private boolean seeked = false;
 
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
@@ -93,16 +95,8 @@ public class HFileInputFormat extends FileInputFormat<NullWritable,
KeyValue> {
       conf = context.getConfiguration();
       Path path = fileSplit.getPath();
       FileSystem fs = path.getFileSystem(conf);
-
-      long fileSize = fs.getFileStatus(path).getLen();
-
-      // Open the underlying input stream; it will be closed by the HFileReader below.
-      FSDataInputStream iStream = fs.open(path);
-      FixedFileTrailer fileTrailer = FixedFileTrailer.readFromStream(iStream, fileSize);
-
-      // If this class is generalized, it may need to account for different data block encodings.
-      this.in = new HFileReaderV2(path, fileTrailer, iStream, iStream, fileSize, true, new
CacheConfig(conf),
-          DataBlockEncoding.NONE, new  HFileSystem(fs));
+      LOG.info("Initialize HFileRecordReader for " + path);
+      this.in = HFile.createReader(fs, path, new CacheConfig(conf));
 
       // The file info must be loaded before the scanner can be used.
       // This seems like a bug in HBase, but it's easily worked around.
@@ -133,15 +127,16 @@ public class HFileInputFormat extends FileInputFormat<NullWritable,
KeyValue> {
         return false;
       }
       boolean hasNext;
-      if (!scanner.isSeeked()) {
+      if (!seeked) {
         if (startRow != null) {
           LOG.info("Seeking to start row " + Bytes.toStringBinary(startRow));
           KeyValue kv = KeyValue.createFirstOnRow(startRow);
-          hasNext = (scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength())
>= 0);
+          hasNext = seekAtOrAfter(scanner, kv);
         } else {
           LOG.info("Seeking to start");
           hasNext = scanner.seekTo();
         }
+        seeked = true;
       } else {
         hasNext = scanner.next();
       }
@@ -182,18 +177,34 @@ public class HFileInputFormat extends FileInputFormat<NullWritable,
KeyValue> {
     public void close() throws IOException {
       in.close();
     }
+
+    // This method is copied from o.a.h.hbase.regionserver.StoreFileScanner, as we don't
want
+    // to depend on it.
+    private static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
+        throws IOException {
+      int result = s.seekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
+      if(result < 0) {
+        // Passed KV is smaller than first KV in file, work from start of file
+        return s.seekTo();
+      } else if(result > 0) {
+        // Passed KV is larger than current KV in file, if there is a next
+        // it is the "after", if not then this scanner is done.
+        return s.next();
+      }
+      // Seeked to the exact key
+      return true;
+    }
   }
 
   @Override
   protected List<FileStatus> listStatus(JobContext job) throws IOException {
     List<FileStatus> result = new ArrayList<FileStatus>();
 
-    FileSystem fs = FileSystem.get(job.getConfiguration());
-
     // Explode out directories that match the original FileInputFormat filters since HFiles
are written to directories where the
     // directory name is the column name
     for (FileStatus status : super.listStatus(job)) {
       if (status.isDir()) {
+        FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
         for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
           result.add(match);
         }

http://git-wip-us.apache.org/repos/asf/crunch/blob/013f2e19/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
index 70f10d5..e7bca2b 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
@@ -19,10 +19,10 @@
  */
 package org.apache.crunch.io.hbase;
 
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/crunch/blob/013f2e19/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
index c1d15a2..31d314d 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
@@ -20,11 +20,16 @@ package org.apache.crunch.io.hbase;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.SourceTargetHelper;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
@@ -37,6 +42,7 @@ import static org.apache.crunch.types.writable.Writables.writables;
 
 public class HFileSource extends FileSourceImpl<KeyValue> implements ReadableSource<KeyValue>
{
 
+  private static final Log LOG = LogFactory.getLog(HFileSource.class);
   private static final PType<KeyValue> KEY_VALUE_PTYPE = writables(KeyValue.class);
 
   public HFileSource(Path path) {
@@ -79,4 +85,36 @@ public class HFileSource extends FileSourceImpl<KeyValue> implements
ReadableSou
   public String toString() {
     return "HFile(" + pathsAsString() + ")";
   }
+
+  @Override
+  public long getSize(Configuration conf) {
+    // HFiles are stored into <family>/<file>, but the default implementation
does not support this.
+    // This is used for estimating the number of reducers. (Otherwise we will always get
1 reducer.)
+    long sum = 0;
+    for (Path path : getPaths()) {
+      try {
+        sum += getSizeInternal(conf, path);
+      } catch (IOException e) {
+        LOG.warn("Failed to estimate size of " + path);
+      }
+    }
+    return sum;
+  }
+
+  private long getSizeInternal(Configuration conf, Path path) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus[] statuses = fs.listStatus(path, HFileInputFormat.HIDDEN_FILE_FILTER);
+    if (statuses == null) {
+      return 0;
+    }
+    long sum = 0;
+    for (FileStatus status : statuses) {
+      if (status.isDir()) {
+        sum += SourceTargetHelper.getPathSize(fs, status.getPath());
+      } else {
+        sum += status.getLen();
+      }
+    }
+    return sum;
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/013f2e19/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index e2f1520..2428c16 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io.hbase;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.DoFn;
@@ -53,6 +54,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -67,6 +69,51 @@ public final class HFileUtils {
 
   private static final Log LOG = LogFactory.getLog(HFileUtils.class);
 
+  /** Compares {@code KeyValue} by its family, qualifier, timestamp (reversely), type (reversely)
and memstoreTS. */
+  private static final Comparator<KeyValue> KEY_VALUE_COMPARATOR = new Comparator<KeyValue>()
{
+    @Override
+    public int compare(KeyValue l, KeyValue r) {
+      int cmp;
+      if ((cmp = compareFamily(l, r)) != 0) {
+        return cmp;
+      }
+      if ((cmp = compareQualifier(l, r)) != 0) {
+        return cmp;
+      }
+      if ((cmp = compareTimestamp(l, r)) != 0) {
+        return cmp;
+      }
+      if ((cmp = compareType(l, r)) != 0) {
+        return cmp;
+      }
+      return compareMemstoreTS(l, r);
+    }
+
+    private int compareFamily(KeyValue l, KeyValue r) {
+      return Bytes.compareTo(
+          l.getBuffer(), l.getFamilyOffset(), l.getFamilyLength(),
+          r.getBuffer(), r.getFamilyOffset(), r.getFamilyLength());
+    }
+
+    private int compareQualifier(KeyValue l, KeyValue r) {
+      return Bytes.compareTo(
+          l.getBuffer(), l.getQualifierOffset(), l.getQualifierLength(),
+          r.getBuffer(), r.getQualifierOffset(), r.getQualifierLength());
+    }
+
+    private int compareTimestamp(KeyValue l, KeyValue r) {
+      return -Longs.compare(l.getTimestamp(), r.getTimestamp());
+    }
+
+    private int compareType(KeyValue l, KeyValue r) {
+      return (int) r.getType() - (int) l.getType();
+    }
+
+    private int compareMemstoreTS(KeyValue l, KeyValue r) {
+      return Longs.compare(l.getMemstoreTS(), r.getMemstoreTS());
+    }
+  };
+
   private static class FilterByFamilyFn extends FilterFn<KeyValue> {
 
     private final byte[] family;
@@ -241,10 +288,12 @@ public final class HFileUtils {
    * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
    */
   public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan)
{
-    // TODO(chaoshi): HFileInputFormat may skip some HFiles if their KVs do not fall into
-    //                the range specified by this scan.
-    PCollection<KeyValue> in = pipeline.read(new HFileSource(ImmutableList.of(path),
scan));
-    return combineIntoRow(in, scan);
+      return scanHFiles(pipeline, ImmutableList.of(path), scan);
+  }
+
+  public static PCollection<Result> scanHFiles(Pipeline pipeline, List<Path>
paths, Scan scan) {
+      PCollection<KeyValue> in = pipeline.read(new HFileSource(paths, scan));
+      return combineIntoRow(in, scan);
   }
 
   public static PCollection<Result> combineIntoRow(PCollection<KeyValue> kvs)
{
@@ -393,8 +442,8 @@ public final class HFileUtils {
 
     kvs = maybeDeleteFamily(kvs);
 
-    // In-place sort KeyValues by family, qualifier and then timestamp reversely.
-    Collections.sort(kvs, KeyValue.COMPARATOR);
+    // In-place sort KeyValues by family, qualifier and then timestamp reversely (whenever
ties, deletes appear first).
+    Collections.sort(kvs, KEY_VALUE_COMPARATOR);
 
     List<KeyValue> results = Lists.newArrayListWithCapacity(kvs.size());
     for (int i = 0, j; i < kvs.size(); i = j) {
@@ -404,6 +453,9 @@ public final class HFileUtils {
       }
       results.addAll(getLatestKeyValuesOfColumn(kvs.subList(i, j), versions));
     }
+    if (results.isEmpty()) {
+      return null;
+    }
     return new Result(results);
   }
 
@@ -412,7 +464,7 @@ public final class HFileUtils {
    * delete family timestamp. Also removes the delete family {@code KeyValue}s.
    */
   private static List<KeyValue> maybeDeleteFamily(List<KeyValue> kvs) {
-    long deleteFamilyCut = 0;
+    long deleteFamilyCut = -1;
     for (KeyValue kv : kvs) {
       if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) {
         deleteFamilyCut = Math.max(deleteFamilyCut, kv.getTimestamp());
@@ -437,7 +489,7 @@ public final class HFileUtils {
   private static boolean hasSameFamilyAndQualifier(KeyValue l, KeyValue r) {
     return Bytes.equals(
         l.getBuffer(), l.getFamilyOffset(), l.getFamilyLength(),
-        r.getBuffer(), r.getFamilyOffset(), r.getFamilyOffset())
+        r.getBuffer(), r.getFamilyOffset(), r.getFamilyLength())
         && Bytes.equals(
         l.getBuffer(), l.getQualifierOffset(), l.getQualifierLength(),
         r.getBuffer(), r.getQualifierOffset(), r.getQualifierLength());
@@ -467,9 +519,10 @@ public final class HFileUtils {
       }
       if (kv.getType() == KeyValue.Type.DeleteColumn.getCode()) {
         break;
-      } else if (kv.getType() == KeyValue.Type.Put.getCode()
-          && kv.getTimestamp() != previousDeleteTimestamp) {
-        results.add(kv);
+      } else if (kv.getType() == KeyValue.Type.Put.getCode()) {
+        if (kv.getTimestamp() != previousDeleteTimestamp) {
+          results.add(kv);
+        }
       } else if (kv.getType() == KeyValue.Type.Delete.getCode()) {
         previousDeleteTimestamp = kv.getTimestamp();
       } else {


Mime
View raw message