fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject incubator-fluo git commit: Added some caching of string to bytes conversions for #800
Date Thu, 17 Nov 2016 20:22:10 GMT
Repository: incubator-fluo
Updated Branches:
  refs/heads/master 25839df16 -> c896fc16b


Added some caching of string to bytes conversions for #800

 * Added some localized per transactions caching of string to bytes conversions
 * Optimized hashCode(), equals(), and compareTo() methods for data objects in API
 * Made transaction reuse passed in Bytes and Column objects when returning data
 * Optimized Bytes.toString() by weakly remembering the string that created it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/c896fc16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/c896fc16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/c896fc16

Branch: refs/heads/master
Commit: c896fc16bb6ab0af4df9e26554516a70c1ffa302
Parents: 25839df
Author: Keith Turner <kturner@apache.org>
Authored: Wed Nov 16 19:32:22 2016 -0500
Committer: Keith Turner <kturner@apache.org>
Committed: Thu Nov 17 14:45:10 2016 -0500

----------------------------------------------------------------------
 .../fluo/api/client/AbstractSnapshotBase.java   |  38 ++++++-
 .../api/client/AbstractTransactionBase.java     |   6 +-
 .../java/org/apache/fluo/api/data/Bytes.java    |  36 +++++--
 .../java/org/apache/fluo/api/data/Column.java   |  18 +++-
 .../org/apache/fluo/api/data/ColumnValue.java   |  16 ++-
 .../org/apache/fluo/api/data/RowColumn.java     |  17 +++-
 .../apache/fluo/api/data/RowColumnValue.java    |  12 ++-
 .../org/apache/fluo/api/data/BytesTest.java     |  12 +++
 .../org/apache/fluo/core/impl/Notification.java |   8 +-
 .../fluo/core/impl/ParallelSnapshotScanner.java |  18 +++-
 .../apache/fluo/core/impl/TransactionImpl.java  |  16 ++-
 .../fluo/core/impl/scanner/CellScannerImpl.java |  38 +++++--
 .../core/impl/scanner/ColumnScannerImpl.java    |  14 +--
 .../fluo/core/impl/scanner/RowScannerImpl.java  |  15 ++-
 .../core/impl/scanner/ScannerBuilderImpl.java   |   4 +-
 .../org/apache/fluo/core/util/ByteUtil.java     |   4 +
 .../fluo/core/util/CachedBytesConverter.java    |  48 +++++++++
 .../fluo/core/util/CachedColumnConverter.java   | 100 +++++++++++++++++++
 .../org/apache/fluo/core/util/ColumnUtil.java   |   7 ++
 .../org/apache/fluo/core/util/Flutation.java    |   4 +-
 .../apache/fluo/integration/impl/ScannerIT.java |  39 ++++++++
 21 files changed, 409 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
index 30b325a..69780b2 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
@@ -19,7 +19,9 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
+import java.util.WeakHashMap;
 
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableSet;
@@ -35,6 +37,33 @@ import org.apache.fluo.api.data.RowColumn;
 
 public abstract class AbstractSnapshotBase implements SnapshotBase {
 
+  /*
+   * This map of String to Bytes is really only useful when user code is executing a transactions.
+   * Once a transaction is queued for commit, do not want this map to eat up memory. Thats
why a
+   * weak map is used.
+   * 
+   * There is intentionally no reverse map from Bytes to String. Relying on two things for
this.
+   * First, Bytes maintains a weak pointer to the string it was created with and returns
this for
+   * toString(). Second, the actual Transaction implementation will under some circumstances
return
+   * the Bytes object that was passed in.
+   */
+  private Map<String, Bytes> s2bCache = new WeakHashMap<String, Bytes>();
+
+  Bytes s2bConv(CharSequence cs) {
+    Objects.requireNonNull(cs);
+    if (cs instanceof String) {
+      String s = (String) cs;
+      Bytes b = s2bCache.get(s);
+      if (b == null) {
+        b = Bytes.of(s);
+        s2bCache.put(s, b);
+      }
+      return b;
+    } else {
+      return Bytes.of(cs);
+    }
+  }
+
   public Bytes get(Bytes row, Column column, Bytes defaultValue) {
     Bytes ret = get(row, column);
     if (ret == null) {
@@ -59,8 +88,7 @@ public abstract class AbstractSnapshotBase implements SnapshotBase {
 
   public Map<String, Map<Column, String>> gets(Collection<? extends CharSequence>
rows,
       Set<Column> columns) {
-    Map<Bytes, Map<Column, Bytes>> rcvs =
-        get(Collections2.transform(rows, s -> Bytes.of(s)), columns);
+    Map<Bytes, Map<Column, Bytes>> rcvs = get(Collections2.transform(rows, this::s2bConv),
columns);
     Map<String, Map<Column, String>> ret = new HashMap<>(rcvs.size());
 
     for (Entry<Bytes, Map<Column, Bytes>> entry : rcvs.entrySet()) {
@@ -75,7 +103,7 @@ public abstract class AbstractSnapshotBase implements SnapshotBase {
   }
 
   public String gets(CharSequence row, Column column) {
-    Bytes val = get(Bytes.of(row), column);
+    Bytes val = get(s2bConv(row), column);
     if (val == null) {
       return null;
     }
@@ -83,7 +111,7 @@ public abstract class AbstractSnapshotBase implements SnapshotBase {
   }
 
   public String gets(CharSequence row, Column column, String defaultValue) {
-    Bytes val = get(Bytes.of(row), column);
+    Bytes val = get(s2bConv(row), column);
     if (val == null) {
       return defaultValue;
     }
@@ -92,7 +120,7 @@ public abstract class AbstractSnapshotBase implements SnapshotBase {
   }
 
   public Map<Column, String> gets(CharSequence row, Set<Column> columns) {
-    Map<Column, Bytes> values = get(Bytes.of(row), columns);
+    Map<Column, Bytes> values = get(s2bConv(row), columns);
     return Maps.transformValues(values, b -> b.toString());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
index 0ffd650..3507e3c 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
@@ -28,14 +28,14 @@ public abstract class AbstractTransactionBase extends AbstractSnapshotBase
imple
     TransactionBase {
 
   public void delete(CharSequence row, Column col) {
-    delete(Bytes.of(row), col);
+    delete(s2bConv(row), col);
   }
 
   public void set(CharSequence row, Column col, CharSequence value) throws AlreadySetException
{
-    set(Bytes.of(row), col, Bytes.of(value));
+    set(s2bConv(row), col, Bytes.of(value));
   }
 
   public void setWeakNotification(CharSequence row, Column col) {
-    setWeakNotification(Bytes.of(row), col);
+    setWeakNotification(s2bConv(row), col);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
index 89b111b..077eac7 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.Charset;
@@ -57,9 +58,11 @@ public final class Bytes implements Comparable<Bytes>, Serializable
{
   private final int offset;
   private final int length;
 
+  private WeakReference<String> utf8String;
+
   public static final Bytes EMPTY = new Bytes(new byte[0]);
 
-  private Integer hashCode = null;
+  private int hashCode = 0;
 
   public Bytes() {
     data = EMPTY.data;
@@ -73,6 +76,13 @@ public final class Bytes implements Comparable<Bytes>, Serializable
{
     this.length = data.length;
   }
 
+  private Bytes(byte[] data, String utf8String) {
+    this.data = data;
+    this.offset = 0;
+    this.length = data.length;
+    this.utf8String = new WeakReference<>(utf8String);
+  }
+
   private Bytes(byte[] data, int offset, int length) {
     if (offset < 0 || offset > data.length || length < 0 || (offset + length) >
data.length) {
       throw new IndexOutOfBoundsException(" Bad offset and/or length data.length = " + data.length
@@ -138,7 +148,16 @@ public final class Bytes implements Comparable<Bytes>, Serializable
{
    */
   @Override
   public String toString() {
-    return new String(data, offset, length, StandardCharsets.UTF_8);
+    if (utf8String != null) {
+      String s = utf8String.get();
+      if (s != null) {
+        return s;
+      }
+    }
+
+    String s = new String(data, offset, length, StandardCharsets.UTF_8);
+    utf8String = new WeakReference<>(s);
+    return s;
   }
 
   /**
@@ -202,13 +221,14 @@ public final class Bytes implements Comparable<Bytes>, Serializable
{
    */
   @Override
   public final boolean equals(Object other) {
+
+    if (this == other) {
+      return true;
+    }
+
     if (other instanceof Bytes) {
       Bytes ob = (Bytes) other;
 
-      if (this == other) {
-        return true;
-      }
-
       if (length() != ob.length()) {
         return false;
       }
@@ -220,7 +240,7 @@ public final class Bytes implements Comparable<Bytes>, Serializable
{
 
   @Override
   public final int hashCode() {
-    if (hashCode == null) {
+    if (hashCode == 0) {
       int hash = 1;
       for (int i = 0; i < length(); i++) {
         hash = (31 * hash) + byteAt(i);
@@ -318,7 +338,7 @@ public final class Bytes implements Comparable<Bytes>, Serializable
{
       return EMPTY;
     }
     byte[] data = s.getBytes(StandardCharsets.UTF_8);
-    return new Bytes(data);
+    return new Bytes(data, s);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/api/src/main/java/org/apache/fluo/api/data/Column.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Column.java b/modules/api/src/main/java/org/apache/fluo/api/data/Column.java
index 008e04c..7a330c4 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Column.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Column.java
@@ -33,6 +33,7 @@ public final class Column implements Comparable<Column>, Serializable
{
   private Bytes family = UNSET;
   private Bytes qualifier = UNSET;
   private Bytes visibility = UNSET;
+  private int hashCode = 0;
 
   public static final Column EMPTY = new Column();
 
@@ -172,11 +173,20 @@ public final class Column implements Comparable<Column>, Serializable
{
 
   @Override
   public int hashCode() {
-    return Objects.hash(family, qualifier, visibility);
+    if (hashCode == 0) {
+      hashCode = Objects.hash(family, qualifier, visibility);
+    }
+
+    return hashCode;
   }
 
   @Override
   public int compareTo(Column other) {
+
+    if (this == other) {
+      return 0;
+    }
+
     int result = family.compareTo(other.family);
     if (result == 0) {
       result = qualifier.compareTo(other.qualifier);
@@ -189,8 +199,14 @@ public final class Column implements Comparable<Column>, Serializable
{
 
   @Override
   public boolean equals(Object o) {
+
+    if (this == o) {
+      return true;
+    }
+
     if (o instanceof Column) {
       Column oc = (Column) o;
+
       return family.equals(oc.getFamily()) && qualifier.equals(oc.getQualifier())
           && visibility.equals(oc.getVisibility());
     }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
index 9d3d054..3bd3f69 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
@@ -27,6 +27,7 @@ public final class ColumnValue implements Serializable, Comparable<ColumnValue>
 
   private Column column;
   private Bytes val;
+  private int hashCode = 0;
 
   public ColumnValue(Column col, Bytes val) {
     this.column = col;
@@ -55,6 +56,11 @@ public final class ColumnValue implements Serializable, Comparable<ColumnValue>
 
   @Override
   public int compareTo(ColumnValue o) {
+
+    if (this == o) {
+      return 0;
+    }
+
     int comp = column.compareTo(o.column);
     if (comp == 0) {
       comp = val.compareTo(o.val);
@@ -64,6 +70,11 @@ public final class ColumnValue implements Serializable, Comparable<ColumnValue>
 
   @Override
   public boolean equals(Object o) {
+
+    if (this == o) {
+      return true;
+    }
+
     if (o instanceof ColumnValue) {
       ColumnValue ocv = (ColumnValue) o;
       return column.equals(ocv.column) && val.equals(ocv.val);
@@ -74,7 +85,10 @@ public final class ColumnValue implements Serializable, Comparable<ColumnValue>
 
   @Override
   public int hashCode() {
-    return Objects.hash(column, val);
+    if (hashCode == 0) {
+      hashCode = Objects.hash(column, val);
+    }
+    return hashCode;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
index 078ad5c..804f653 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
@@ -31,6 +31,7 @@ public final class RowColumn implements Comparable<RowColumn>, Serializable
{
 
   private Bytes row = Bytes.EMPTY;
   private Column col = Column.EMPTY;
+  private int hashCode = 0;
 
   /**
    * Constructs a RowColumn with row set to Bytes.EMPTY and column set to Column.EMPTY
@@ -113,6 +114,11 @@ public final class RowColumn implements Comparable<RowColumn>,
Serializable {
 
   @Override
   public boolean equals(Object o) {
+
+    if (this == o) {
+      return true;
+    }
+
     if (o instanceof RowColumn) {
       RowColumn other = (RowColumn) o;
       return row.equals(other.row) && col.equals(other.col);
@@ -122,7 +128,11 @@ public final class RowColumn implements Comparable<RowColumn>,
Serializable {
 
   @Override
   public int hashCode() {
-    return Objects.hash(row, col);
+    if (hashCode == 0) {
+      hashCode = Objects.hash(row, col);
+    }
+
+    return hashCode;
   }
 
   /**
@@ -158,6 +168,11 @@ public final class RowColumn implements Comparable<RowColumn>,
Serializable {
 
   @Override
   public int compareTo(RowColumn other) {
+
+    if (this == other) {
+      return 0;
+    }
+
     int result = row.compareTo(other.row);
     if (result == 0) {
       result = col.compareTo(other.col);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java
index 455271e..d9aedd6 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java
@@ -29,6 +29,7 @@ public final class RowColumnValue implements Comparable<RowColumnValue>,
Seriali
   private Bytes row = Bytes.EMPTY;
   private Column col = Column.EMPTY;
   private Bytes val = Bytes.EMPTY;
+  private int hashCode = 0;
 
   public RowColumnValue(Bytes row, Column col, Bytes val) {
     this.row = row;
@@ -87,11 +88,15 @@ public final class RowColumnValue implements Comparable<RowColumnValue>,
Seriali
 
   @Override
   public int hashCode() {
-    return Objects.hash(row, col, val);
+    if (hashCode == 0) {
+      hashCode = Objects.hash(row, col, val);
+    }
+    return hashCode;
   }
 
   @Override
   public boolean equals(Object o) {
+
     if (o == this) {
       return true;
     }
@@ -110,6 +115,11 @@ public final class RowColumnValue implements Comparable<RowColumnValue>,
Seriali
 
   @Override
   public int compareTo(RowColumnValue o) {
+
+    if (this == o) {
+      return 0;
+    }
+
     int result = row.compareTo(o.row);
     if (result == 0) {
       result = col.compareTo(o.col);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/api/src/test/java/org/apache/fluo/api/data/BytesTest.java
----------------------------------------------------------------------
diff --git a/modules/api/src/test/java/org/apache/fluo/api/data/BytesTest.java b/modules/api/src/test/java/org/apache/fluo/api/data/BytesTest.java
index 720a1a3..14d2e76 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/data/BytesTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/data/BytesTest.java
@@ -211,4 +211,16 @@ public class BytesTest {
     AsciiSequence cs2 = new AsciiSequence("");
     Assert.assertSame(Bytes.EMPTY, Bytes.of(cs2));
   }
+
+  @Test
+  public void testSameString() {
+    String s1 = "abc";
+    String s2 = "xyZ";
+
+    Bytes b1 = Bytes.of(s1);
+    Bytes b2 = Bytes.of(s2);
+
+    Assert.assertSame(s1, b1.toString());
+    Assert.assertSame(s2, b2.toString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java
index 17c3e00..3d2728c 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java
@@ -43,6 +43,7 @@ public class Notification {
 
   private final RowColumn rowCol;
   private final long timestamp;
+  private static final byte[] NOTIFY_CF_ARRAY = ColumnConstants.NOTIFY_CF.toArray();
 
   public Notification(Bytes row, Column col, long ts) {
     rowCol = new RowColumn(row, col);
@@ -72,15 +73,14 @@ public class Notification {
   public Flutation newDelete(Environment env, long ts) {
     Flutation m = new Flutation(env, rowCol.getRow());
     ColumnVisibility cv = env.getSharedResources().getVisCache().getCV(rowCol.getColumn());
-    m.put(ColumnConstants.NOTIFY_CF.toArray(), encodeCol(rowCol.getColumn()), cv,
-        encodeTs(ts, true), TransactionImpl.EMPTY);
+    m.put(NOTIFY_CF_ARRAY, encodeCol(rowCol.getColumn()), cv, encodeTs(ts, true),
+        TransactionImpl.EMPTY);
     return m;
   }
 
   public static void put(Environment env, Mutation m, Column col, long ts) {
     ColumnVisibility cv = env.getSharedResources().getVisCache().getCV(col);
-    m.put(ColumnConstants.NOTIFY_CF.toArray(), encodeCol(col), cv, encodeTs(ts, false),
-        TransactionImpl.EMPTY);
+    m.put(NOTIFY_CF_ARRAY, encodeCol(col), cv, encodeTs(ts, false), TransactionImpl.EMPTY);
   }
 
   public static Notification from(Key k) {

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index 8f9b3af..16470f2 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -23,9 +23,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.function.Function;
 
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -34,6 +36,9 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.core.util.ByteUtil;
+import org.apache.fluo.core.util.CachedBytesConverter;
+import org.apache.fluo.core.util.CachedColumnConverter;
+import org.apache.fluo.core.util.ColumnUtil;
 import org.apache.fluo.core.util.UtilWaitThread;
 
 public class ParallelSnapshotScanner {
@@ -44,6 +49,8 @@ public class ParallelSnapshotScanner {
   private Set<Column> columns;
   private TxStats stats;
   private List<Range> rangesToScan = new ArrayList<>();
+  private Function<ByteSequence, Bytes> rowConverter;
+  private Function<Key, Column> columnConverter;
 
   ParallelSnapshotScanner(Collection<Bytes> rows, Set<Column> columns, Environment
env,
       long startTs, TxStats stats) {
@@ -52,6 +59,8 @@ public class ParallelSnapshotScanner {
     this.env = env;
     this.startTs = startTs;
     this.stats = stats;
+    this.rowConverter = new CachedBytesConverter(rows);
+    this.columnConverter = new CachedColumnConverter(columns);
   }
 
   ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs,
TxStats stats) {
@@ -71,6 +80,8 @@ public class ParallelSnapshotScanner {
     this.env = env;
     this.startTs = startTs;
     this.stats = stats;
+    this.rowConverter = ByteUtil::toBytes;
+    this.columnConverter = ColumnUtil::convert;
   }
 
   private BatchScanner setupBatchScanner() {
@@ -156,11 +167,8 @@ public class ParallelSnapshotScanner {
     BatchScanner bs = setupBatchScanner();
     try {
       for (Entry<Key, Value> entry : bs) {
-        Bytes row = ByteUtil.toBytes(entry.getKey().getRowData());
-        Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData());
-        Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData());
-
-        Column col = new Column(cf, cq, ByteUtil.toBytes(entry.getKey().getColumnVisibilityData()));
+        Bytes row = rowConverter.apply(entry.getKey().getRowData());
+        Column col = columnConverter.apply(entry.getKey());
 
         long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 07a14b4..e8398f8 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -49,6 +48,7 @@ import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.PrewriteIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
 import org.apache.fluo.accumulo.values.DelLockValue;
@@ -58,7 +58,6 @@ import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.AlreadySetException;
@@ -70,7 +69,6 @@ import org.apache.fluo.core.async.AsyncTransaction;
 import org.apache.fluo.core.async.SyncCommitObserver;
 import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
 import org.apache.fluo.core.exceptions.StaleScanException;
-import org.apache.fluo.core.impl.scanner.ColumnScannerImpl;
 import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
 import org.apache.fluo.core.oracle.Stamp;
 import org.apache.fluo.core.util.ColumnUtil;
@@ -246,16 +244,14 @@ public class TransactionImpl extends AbstractTransactionBase implements
AsyncTra
 
     Map<Column, Bytes> ret = new HashMap<>();
 
-    Iterable<ColumnValue> scanner =
-        Iterables.transform(new SnapshotScanner(env, opts, startTs, stats),
-            ColumnScannerImpl::entry2cv);
-    for (ColumnValue cv : scanner) {
+    for (Entry<Key, Value> kve : new SnapshotScanner(env, opts, startTs, stats)) {
+      Column col = ColumnUtil.convert(kve.getKey());
       if (shouldCopy) {
-        if (columns.contains(cv.getColumn())) {
-          ret.put(cv.getColumn(), cv.getValue());
+        if (columns.contains(col)) {
+          ret.put(col, Bytes.of(kve.getValue().get()));
         }
       } else {
-        ret.put(cv.getColumn(), cv.getValue());
+        ret.put(col, Bytes.of(kve.getValue().get()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
index e85a2f0..07d511c 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
@@ -15,10 +15,14 @@
 
 package org.apache.fluo.core.impl.scanner;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.function.Function;
 
 import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.api.client.scanner.CellScanner;
@@ -26,27 +30,45 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.core.util.ByteUtil;
+import org.apache.fluo.core.util.CachedColumnConverter;
+import org.apache.fluo.core.util.ColumnUtil;
 
 public class CellScannerImpl implements CellScanner {
 
   private Iterable<Entry<Key, Value>> snapshot;
+  private Function<Key, Column> columnConverter;
 
-  private static RowColumnValue entry2rcv(Entry<Key, Value> entry) {
-    Bytes row = ByteUtil.toBytes(entry.getKey().getRowData());
-    Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData());
-    Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData());
-    Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData());
-    Column col = new Column(cf, cq, cv);
+  private static final ByteSequence EMPTY_BS = new ArrayByteSequence(new byte[0]);
+  private ByteSequence prevRowBs = EMPTY_BS;
+  private Bytes prevRowBytes = Bytes.EMPTY;
+
+  private RowColumnValue entry2rcv(Entry<Key, Value> entry) {
+
+    ByteSequence rowBS = entry.getKey().getRowData();
+    Bytes row;
+    if (prevRowBs.equals(rowBS)) {
+      row = prevRowBytes;
+    } else {
+      prevRowBs = rowBS;
+      prevRowBytes = row = ByteUtil.toBytes(rowBS);
+    }
+
+    Column col = columnConverter.apply(entry.getKey());
     Bytes val = Bytes.of(entry.getValue().get());
     return new RowColumnValue(row, col, val);
   }
 
-  CellScannerImpl(Iterable<Entry<Key, Value>> snapshot) {
+  CellScannerImpl(Iterable<Entry<Key, Value>> snapshot, Collection<Column>
columns) {
     this.snapshot = snapshot;
+    if (columns.size() == 0) {
+      columnConverter = ColumnUtil::convert;
+    } else {
+      columnConverter = new CachedColumnConverter(columns);
+    }
   }
 
   @Override
   public Iterator<RowColumnValue> iterator() {
-    return Iterators.transform(snapshot.iterator(), CellScannerImpl::entry2rcv);
+    return Iterators.transform(snapshot.iterator(), this::entry2rcv);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
index 21ee94c..bea449a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
@@ -17,6 +17,7 @@ package org.apache.fluo.core.impl.scanner;
 
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.function.Function;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
@@ -31,11 +32,8 @@ import org.apache.fluo.core.util.ByteUtil;
 
 public class ColumnScannerImpl implements ColumnScanner {
 
-  public static ColumnValue entry2cv(Entry<Key, Value> entry) {
-    Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData());
-    Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData());
-    Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData());
-    Column col = new Column(cf, cq, cv);
+  public ColumnValue entry2cv(Entry<Key, Value> entry) {
+    Column col = columnConverter.apply(entry.getKey());
     Bytes val = Bytes.of(entry.getValue().get());
     return new ColumnValue(col, val);
   }
@@ -44,11 +42,13 @@ public class ColumnScannerImpl implements ColumnScanner {
   private Bytes row;
   private Iterator<ColumnValue> iter;
   private boolean gotIter = false;
+  private Function<Key, Column> columnConverter;
 
-  ColumnScannerImpl(Iterator<Entry<Key, Value>> e) {
+  ColumnScannerImpl(Iterator<Entry<Key, Value>> e, Function<Key, Column>
columnConverter) {
     peekingIter = Iterators.peekingIterator(e);
+    this.columnConverter = columnConverter;
     row = ByteUtil.toBytes(peekingIter.peek().getKey().getRowData());
-    iter = Iterators.transform(peekingIter, ColumnScannerImpl::entry2cv);
+    iter = Iterators.transform(peekingIter, this::entry2cv);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
index 631ac5d..063c2b0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
@@ -15,8 +15,10 @@
 
 package org.apache.fluo.core.impl.scanner;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.function.Function;
 
 import com.google.common.collect.Iterators;
 import org.apache.accumulo.core.client.RowIterator;
@@ -24,18 +26,27 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.core.util.CachedColumnConverter;
+import org.apache.fluo.core.util.ColumnUtil;
 
 public class RowScannerImpl implements RowScanner {
 
   private Iterable<Entry<Key, Value>> snapshot;
+  private Function<Key, Column> columnConverter;
 
-  RowScannerImpl(Iterable<Entry<Key, Value>> snapshot) {
+  RowScannerImpl(Iterable<Entry<Key, Value>> snapshot, Collection<Column>
columns) {
     this.snapshot = snapshot;
+    if (columns.size() == 0) {
+      columnConverter = ColumnUtil::convert;
+    } else {
+      columnConverter = new CachedColumnConverter(columns);
+    }
   }
 
   @Override
   public Iterator<ColumnScanner> iterator() {
     RowIterator rowiter = new RowIterator(snapshot.iterator());
-    return Iterators.transform(rowiter, e -> new ColumnScannerImpl(e));
+    return Iterators.transform(rowiter, e -> new ColumnScannerImpl(e, columnConverter));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
index 8c833d5..21db046 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
@@ -74,7 +74,7 @@ public class ScannerBuilderImpl implements ScannerBuilder {
   @Override
   public CellScanner build() {
     SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns);
-    return new CellScannerImpl(snapScanner);
+    return new CellScannerImpl(snapScanner, columns);
   }
 
   @Override
@@ -83,7 +83,7 @@ public class ScannerBuilderImpl implements ScannerBuilder {
       @Override
       public RowScanner build() {
         SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns);
-        return new RowScannerImpl(snapScanner);
+        return new RowScannerImpl(snapScanner, columns);
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ByteUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ByteUtil.java
index 3fcb2c9..f6e9db4 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ByteUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ByteUtil.java
@@ -57,6 +57,10 @@ public class ByteUtil {
    * @return Bytes object
    */
   public static Bytes toBytes(ByteSequence bs) {
+    if (bs.length() == 0) {
+      return Bytes.EMPTY;
+    }
+
     if (bs.isBackedByArray()) {
       return Bytes.of(bs.getBackingArray(), bs.offset(), bs.length());
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/util/CachedBytesConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/CachedBytesConverter.java
b/modules/core/src/main/java/org/apache/fluo/core/util/CachedBytesConverter.java
new file mode 100644
index 0000000..dc99f84
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/CachedBytesConverter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.fluo.core.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.fluo.api.data.Bytes;
+
+public class CachedBytesConverter implements Function<ByteSequence, Bytes> {
+
+  private Map<ByteSequence, Bytes> bs2bCache = new HashMap<>();
+
+  public CachedBytesConverter(Collection<Bytes> bytesCollection) {
+    for (Bytes bytes : bytesCollection) {
+      bs2bCache.put(ByteUtil.toByteSequence(bytes), bytes);
+    }
+  }
+
+  @Override
+  public Bytes apply(ByteSequence bs) {
+    if (bs.length() == 0) {
+      return Bytes.EMPTY;
+    }
+
+    Bytes b = bs2bCache.get(bs);
+    if (b == null) {
+      return ByteUtil.toBytes(bs);
+    }
+    return b;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/util/CachedColumnConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/CachedColumnConverter.java
b/modules/core/src/main/java/org/apache/fluo/core/util/CachedColumnConverter.java
new file mode 100644
index 0000000..8415e01
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/CachedColumnConverter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.fluo.core.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+public class CachedColumnConverter implements Function<Key, Column> {
+  private Map<ColumnKey, Column> colCache = new HashMap<>();
+  private ColumnKey resuableKey = new ColumnKey();
+
+  private static class ColumnKey {
+    private ByteSequence family;
+    private ByteSequence qualifier;
+    private ByteSequence vis;
+    private int hashCode = 0;
+
+    ColumnKey() {}
+
+    ColumnKey(Column col) {
+      this.family = ByteUtil.toByteSequence(col.getFamily());
+      this.qualifier = ByteUtil.toByteSequence(col.getQualifier());
+      this.vis = ByteUtil.toByteSequence(col.getVisibility());
+      this.hashCode = 0;
+    }
+
+    void reset(ByteSequence family, ByteSequence qualifier, ByteSequence vis) {
+      this.family = family;
+      this.qualifier = qualifier;
+      this.vis = vis;
+      this.hashCode = 0;
+    }
+
+    @Override
+    public int hashCode() {
+      if (hashCode == 0) {
+        hashCode = family.hashCode();
+        hashCode = 31 * hashCode + qualifier.hashCode();
+        hashCode = 31 * hashCode + vis.hashCode();
+      }
+
+      return hashCode;
+    }
+
+    public boolean equals(Object o) {
+      if (o instanceof ColumnKey) {
+        ColumnKey ock = (ColumnKey) o;
+        return family.equals(ock.family) && qualifier.equals(ock.qualifier) &&
vis.equals(ock.vis);
+      }
+
+      return false;
+    }
+  }
+
+  public CachedColumnConverter(Collection<Column> cols) {
+    for (Column col : cols) {
+      colCache.put(new ColumnKey(col), col);
+    }
+  }
+
+  @Override
+  public Column apply(Key k) {
+    return toColumn(k.getColumnFamilyData(), k.getColumnQualifierData(),
+        k.getColumnVisibilityData());
+  }
+
+  private Column toColumn(ByteSequence family, ByteSequence qualifier, ByteSequence vis)
{
+    resuableKey.reset(family, qualifier, vis);
+    Column col = colCache.get(resuableKey);
+
+    if (col == null) {
+      Bytes f = ByteUtil.toBytes(family);
+      Bytes q = ByteUtil.toBytes(qualifier);
+      Bytes v = ByteUtil.toBytes(vis);
+      return new Column(f, q, v);
+    }
+
+    return col;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index b934f0b..be583da 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -115,4 +115,11 @@ public class ColumnUtil {
     Bytes visibility = ByteUtil.read(bb, in);
     return new Column(family, qualifier, visibility);
   }
+
+  public static Column convert(Key k) {
+    Bytes f = ByteUtil.toBytes(k.getColumnFamilyData());
+    Bytes q = ByteUtil.toBytes(k.getColumnQualifierData());
+    Bytes v = ByteUtil.toBytes(k.getColumnVisibilityData());
+    return new Column(f, q, v);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/core/src/main/java/org/apache/fluo/core/util/Flutation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/Flutation.java b/modules/core/src/main/java/org/apache/fluo/core/util/Flutation.java
index 4daeb68..0c00b17 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/Flutation.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/Flutation.java
@@ -16,7 +16,6 @@
 package org.apache.fluo.core.util;
 
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -50,7 +49,6 @@ public class Flutation extends Mutation {
       cv = new ColumnVisibility(ByteUtil.toText(col.getVisibility()));
     }
 
-    m.put(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier()), cv, ts,
new Value(
-        val));
+    m.put(col.getFamily().toArray(), col.getQualifier().toArray(), cv, ts, val);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/c896fc16/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
index 386ab6f..6d62177 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
@@ -25,6 +25,7 @@ import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.RowColumnValue;
@@ -94,6 +95,44 @@ public class ScannerIT extends ITBaseImpl {
   }
 
   @Test
+  public void testSame() {
+    Set<RowColumnValue> expected = genData();
+
+    Column col1 = new Column("f1", "q1");
+    Column col2 = new Column("f2", "q3");
+
+    HashSet<RowColumnValue> expectedC = new HashSet<>();
+    Iterables.addAll(
+        expectedC,
+        Iterables.filter(expected,
+            rcv -> rcv.getColumn().equals(col1) || rcv.getColumn().equals(col2)));
+    Assert.assertEquals(3, expectedC.size());
+
+    try (Snapshot snap = client.newSnapshot()) {
+      CellScanner scanner = snap.scanner().fetch(col1, col2).build();
+
+      HashSet<RowColumnValue> actual = new HashSet<>();
+      Bytes prevRow = null;
+      for (RowColumnValue rcv : scanner) {
+        actual.add(rcv);
+
+        Column c = rcv.getColumn();
+
+        Assert.assertTrue((col1.equals(c) && col1 == c) || (col2.equals(c) &&
col2 == c));
+
+        if (col2.equals(c)) {
+          Assert.assertEquals(Bytes.of("r1"), rcv.getRow());
+          Assert.assertSame(rcv.getRow(), prevRow);
+        }
+
+        prevRow = rcv.getRow();
+      }
+
+      Assert.assertEquals(expectedC, actual);
+    }
+  }
+
+  @Test
   public void testMultipleIteratorsFromSameRowScanner() {
     Set<RowColumnValue> expected = genData();
 



Mime
View raw message