fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mwa...@apache.org
Subject [1/2] incubator-fluo-recipes git commit: Fixes #82 - Moved TypeLayer from Fluo API to Fluo Recipes
Date Thu, 07 Jul 2016 20:51:28 GMT
Repository: incubator-fluo-recipes
Updated Branches:
  refs/heads/master 93f482aa4 -> 640c51716


Fixes #82 - Moved TypeLayer from Fluo API to Fluo Recipes


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/777db493
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/777db493
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/777db493

Branch: refs/heads/master
Commit: 777db493ce55f6b9b40b5434106456fbb4014d49
Parents: 93f482a
Author: Mike Walch <mwalch@gmail.com>
Authored: Fri Jul 1 12:05:45 2016 -0400
Committer: Mike Walch <mwalch@gmail.com>
Committed: Fri Jul 1 12:05:45 2016 -0400

----------------------------------------------------------------------
 .../org/apache/fluo/recipes/types/Encoder.java  |  86 +++
 .../fluo/recipes/types/StringEncoder.java       |  86 +++
 .../apache/fluo/recipes/types/TypeLayer.java    | 488 ++++++++++++++++
 .../apache/fluo/recipes/types/TypedLoader.java  |  45 ++
 .../fluo/recipes/types/TypedObserver.java       |  46 ++
 .../fluo/recipes/types/TypedSnapshot.java       |  38 ++
 .../fluo/recipes/types/TypedSnapshotBase.java   | 562 +++++++++++++++++++
 .../fluo/recipes/types/TypedTransaction.java    |  46 ++
 .../recipes/types/TypedTransactionBase.java     | 278 +++++++++
 .../apache/fluo/recipes/types/MockSnapshot.java |  30 +
 .../fluo/recipes/types/MockSnapshotBase.java    | 202 +++++++
 .../fluo/recipes/types/MockTransaction.java     |  36 ++
 .../fluo/recipes/types/MockTransactionBase.java |  90 +++
 .../fluo/recipes/types/TypeLayerTest.java       | 494 ++++++++++++++++
 14 files changed, 2527 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/main/java/org/apache/fluo/recipes/types/Encoder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/Encoder.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/Encoder.java
new file mode 100644
index 0000000..6b1b626
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/Encoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.data.Bytes;
+
+/**
+ * Transforms Java primitives to and from bytes using desired encoding
+ *
+ * @since 1.0.0
+ */
+public interface Encoder {
+
+  /**
+   * Encodes an integer to {@link Bytes}
+   */
+  Bytes encode(int i);
+
+  /**
+   * Encodes a long to {@link Bytes}
+   */
+  Bytes encode(long l);
+
+  /**
+   * Encodes a String to {@link Bytes}
+   */
+  Bytes encode(String s);
+
+  /**
+   * Encodes a float to {@link Bytes}
+   */
+  Bytes encode(float f);
+
+  /**
+   * Encodes a double to {@link Bytes}
+   */
+  Bytes encode(double d);
+
+  /**
+   * Encodes a boolean to {@link Bytes}
+   */
+  Bytes encode(boolean b);
+
+  /**
+   * Decodes an integer from {@link Bytes}
+   */
+  int decodeInteger(Bytes b);
+
+  /**
+   * Decodes a long from {@link Bytes}
+   */
+  long decodeLong(Bytes b);
+
+  /**
+   * Decodes a String from {@link Bytes}
+   */
+  String decodeString(Bytes b);
+
+  /**
+   * Decodes a float from {@link Bytes}
+   */
+  float decodeFloat(Bytes b);
+
+  /**
+   * Decodes a double from {@link Bytes}
+   */
+  double decodeDouble(Bytes b);
+
+  /**
+   * Decodes a boolean from {@link Bytes}
+   */
+  boolean decodeBoolean(Bytes b);
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/main/java/org/apache/fluo/recipes/types/StringEncoder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/StringEncoder.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/StringEncoder.java
new file mode 100644
index 0000000..10524ed
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/StringEncoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.data.Bytes;
+
+/**
+ * Transforms Java primitives to and from bytes using a String encoding
+ *
+ * @since 1.0.0
+ */
+public class StringEncoder implements Encoder {
+
+  @Override
+  public Bytes encode(int i) {
+    return encode(Integer.toString(i));
+  }
+
+  @Override
+  public Bytes encode(long l) {
+    return encode(Long.toString(l));
+  }
+
+  @Override
+  public Bytes encode(String s) {
+    return Bytes.of(s);
+  }
+
+  @Override
+  public Bytes encode(float f) {
+    return encode(Float.toString(f));
+  }
+
+  @Override
+  public Bytes encode(double d) {
+    return encode(Double.toString(d));
+  }
+
+  @Override
+  public Bytes encode(boolean b) {
+    return encode(Boolean.toString(b));
+  }
+
+  @Override
+  public int decodeInteger(Bytes b) {
+    return Integer.parseInt(decodeString(b));
+  }
+
+  @Override
+  public long decodeLong(Bytes b) {
+    return Long.parseLong(decodeString(b));
+  }
+
+  @Override
+  public String decodeString(Bytes b) {
+    return b.toString();
+  }
+
+  @Override
+  public float decodeFloat(Bytes b) {
+    return Float.parseFloat(decodeString(b));
+  }
+
+  @Override
+  public double decodeDouble(Bytes b) {
+    return Double.parseDouble(decodeString(b));
+  }
+
+  @Override
+  public boolean decodeBoolean(Bytes b) {
+    return Boolean.parseBoolean(decodeString(b));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/main/java/org/apache/fluo/recipes/types/TypeLayer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypeLayer.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypeLayer.java
new file mode 100644
index 0000000..0bd189f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypeLayer.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.nio.ByteBuffer;
+
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+/**
+ * A simple convenience layer for Fluo. This layer attempts to make the following common operations
+ * easier.
+ * 
+ * <UL>
+ * <LI>Working with different types.
+ * <LI>Supplying default values
+ * <LI>Dealing with null return types.
+ * <LI>Working with row/column and column maps
+ * </UL>
+ * 
+ * <p>
+ * This layer was intentionally loosely coupled with the basic API. This allows other convenience
+ * layers for Fluo to build directly on the basic API w/o having to consider the particulars of this
+ * layer. Also its expected that integration with other languages may only use the basic API.
+ * </p>
+ * 
+ * <h3>Using</h3>
+ * 
+ * <p>
+ * A TypeLayer is created with a certain encoder that is used for converting from bytes to
+ * primitives and visa versa. In order to ensure that all of your code uses the same encoder, its
+ * probably best to centralize the choice of an encoder within your project. There are many ways do
+ * to this, below is an example of one way to centralize and use.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   public class MyTypeLayer extends TypeLayer {
+ *     public MyTypeLayer() {
+ *       super(new MyEncoder());
+ *     }
+ *   }
+ *   
+ *   public class MyObserver extends TypedObserver {
+ *     MyObserver(){
+ *       super(new MyTypeLayer());
+ *     }
+ *     
+ *     public abstract void process(TypedTransaction tx, Bytes row, Column col){
+ *       //do something w/ typed transaction
+ *     }
+ *   }
+ *   
+ *   public class MyUtil {
+ *      //A little util to print out some stuff
+ *      public void printStuff(Snapshot snap, byte[] row){
+ *        TypedSnapshot tsnap = new MytTypeLayer().wrap(snap);
+ *        
+ *        System.out.println(tsnap.get().row(row).fam("b90000").qual(137).toString("NP"));
+ *      } 
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <h3>Working with different types</h3>
+ * 
+ * <p>
+ * The following example code shows using the basic fluo API with different types.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void process(Transaction tx, byte[] row, byte[] cf, int cq, long val){
+ *     tx.set(Bytes.of(row), new Column(Bytes.of(cf), Bytes.of(Integer.toString(cq))),
+ *        Bytes.of(Long.toString(val));
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <p>
+ * Alternatively, the same thing can be written using a {@link TypedTransactionBase} in the
+ * following way. Because row(), fam(), qual(), and set() each take many different types, this
+ * enables many different permutations that would not be achievable with overloading.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void process(TypedTransaction tx, byte[] r, byte[] cf, int cq, long v){
+ *     tx.mutate().row(r).fam(cf).qual(cq).set(v);
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <h3>Default values</h3>
+ * 
+ * <p>
+ * The following example code shows using the basic fluo API to read a value and default to zero if
+ * it does not exist.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void add(Transaction tx, byte[] row, Column col, long amount){
+ *     
+ *     long balance = 0;
+ *     Bytes bval = tx.get(Bytes.of(row), col);
+ *     if(bval != null)
+ *       balance = Long.parseLong(bval.toString());
+ *     
+ *     balance += amount;
+ *     
+ *     tx.set(Bytes.of(row), col, Bytes.of(Long.toString(amount)));
+ *     
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <p>
+ * Alternatively, the same thing can be written using a {@link TypedTransactionBase} in the
+ * following way. This code avoids the null check by supplying a default value of zero.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void add(TypedTransaction tx, byte[] r, Column c, long amount){
+ *     long balance = tx.get().row(r).col(c).toLong(0);
+ *     balance += amount;
+ *     tx.mutate().row(r).col(c).set(balance);
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <p>
+ * For this particular case, shorter code can be written by using the increment method.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void add(TypedTransaction tx, byte[] r, Column c, long amount){
+ *     tx.mutate().row(r).col(c).increment(amount);
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <h3>Null return types</h3>
+ * 
+ * <p>
+ * When using the basic API, you must ensure the return type is not null before converting a string
+ * or long.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void process(Transaction tx, byte[] row, Column col, long amount) {
+ *     Bytes val =  tx.get(Bytes.of(row), col);
+ *     if(val == null)
+ *       return;   
+ *     long balance = Long.parseLong(val.toString());
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <p>
+ * With {@link TypedTransactionBase} if no default value is supplied, then the null is passed
+ * through.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void process(TypedTransaction tx, byte[] r, Column c, long amount){
+ *     Long balance =  tx.get().row(r).col(c).toLong();
+ *     if(balance == null)
+ *       return;   
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <h3>Defaulted maps</h3>
+ * 
+ * <p>
+ * The operations that return maps, return defaulted maps which make it easy to specify defaults and
+ * avoid null.
+ * </p>
+ * 
+ * <pre>
+ * {@code
+ *   // pretend this method has curly braces.  javadoc has issues with less than.
+ * 
+ *   void process(TypedTransaction tx, byte[] r, Column c1, Column c2, Column c3, long amount)
+ * 
+ *     Map<Column, Value> columns = tx.get().row(r).columns(c1,c2,c3);
+ *     
+ *     // If c1 does not exist in map, a Value that wraps null will be returned.
+ *     // When c1 does not exist val1 will be set to null and no NPE will be thrown.
+ *     String val1 = columns.get(c1).toString();
+ *     
+ *     // If c2 does not exist in map, then val2 will be set to empty string.
+ *     String val2 = columns.get(c2).toString("");
+ *     
+ *     // If c3 does not exist in map, then val9 will be set to 9.
+ *     Long val3 = columns.get(c3).toLong(9);
+ * }
+ * </pre>
+ * 
+ * <p>
+ * This also applies to getting sets of rows.
+ * </p>
+ * 
+ * <pre>
+ * {@code
+ *   // pretend this method has curly braces.  javadoc has issues with less than.
+ * 
+ *   void process(TypedTransaction tx, List<String> rows, Column c1, Column c2, Column c3,
+ *     long amount)
+ * 
+ *     Map<String,Map<Column,Value>> rowCols =
+ *        tx.get().rowsString(rows).columns(c1,c2,c3).toStringMap();
+ *     
+ *     // this will set val1 to null if row does not exist in map and/or column does not
+ *     // exist in child map
+ *     String val1 = rowCols.get("row1").get(c1).toString();
+ * }
+ * </pre>
+ *
+ * @since 1.0.0
+ */
+public class TypeLayer {
+
+  private Encoder encoder;
+
+  static class Data {
+    Bytes row;
+    Bytes family;
+    Bytes qual;
+    Bytes vis;
+
+    Column getCol() {
+      if (qual == null) {
+        return new Column(family);
+      } else if (vis == null) {
+        return new Column(family, qual);
+      } else {
+        return new Column(family, qual, vis);
+      }
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public abstract class RowMethods<R> {
+
+    abstract R create(Data data);
+
+    public R row(String row) {
+      return row(encoder.encode(row));
+    }
+
+    public R row(int row) {
+      return row(encoder.encode(row));
+    }
+
+    public R row(long row) {
+      return row(encoder.encode(row));
+    }
+
+    public R row(byte[] row) {
+      return row(Bytes.of(row));
+    }
+
+    public R row(ByteBuffer row) {
+      return row(Bytes.of(row));
+    }
+
+    public R row(Bytes row) {
+      Data data = new Data();
+      data.row = row;
+      R result = create(data);
+      return result;
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public abstract class SimpleFamilyMethods<R1> {
+
+    protected Data data;
+
+    SimpleFamilyMethods(Data data) {
+      this.data = data;
+    }
+
+    abstract R1 create1(Data data);
+
+    public R1 fam(String family) {
+      return fam(encoder.encode(family));
+    }
+
+    public R1 fam(int family) {
+      return fam(encoder.encode(family));
+    }
+
+    public R1 fam(long family) {
+      return fam(encoder.encode(family));
+    }
+
+    public R1 fam(byte[] family) {
+      return fam(Bytes.of(family));
+    }
+
+    public R1 fam(ByteBuffer family) {
+      return fam(Bytes.of(family));
+    }
+
+    public R1 fam(Bytes family) {
+      data.family = family;
+      return create1(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public abstract class FamilyMethods<R1, R2> extends SimpleFamilyMethods<R1> {
+
+    FamilyMethods(Data data) {
+      super(data);
+    }
+
+    abstract R2 create2(Data data);
+
+    public R2 col(Column col) {
+      data.family = col.getFamily();
+      data.qual = col.getQualifier();
+      data.vis = col.getVisibility();
+      return create2(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public abstract class QualifierMethods<R> {
+
+    protected Data data;
+
+    QualifierMethods(Data data) {
+      this.data = data;
+    }
+
+    abstract R create(Data data);
+
+    public R qual(String qualifier) {
+      return qual(encoder.encode(qualifier));
+    }
+
+    public R qual(int qualifier) {
+      return qual(encoder.encode(qualifier));
+    }
+
+    public R qual(long qualifier) {
+      return qual(encoder.encode(qualifier));
+    }
+
+    public R qual(byte[] qualifier) {
+      return qual(Bytes.of(qualifier));
+    }
+
+    public R qual(ByteBuffer qualifier) {
+      return qual(Bytes.of(qualifier));
+    }
+
+    public R qual(Bytes qualifier) {
+      data.qual = qualifier;
+      return create(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public static class VisibilityMethods {
+
+    private Data data;
+
+    public VisibilityMethods(Data data) {
+      this.data = data;
+    }
+
+    public Column vis() {
+      return new Column(data.family, data.qual);
+    }
+
+    public Column vis(String cv) {
+      return vis(Bytes.of(cv));
+    }
+
+    public Column vis(Bytes cv) {
+      return new Column(data.family, data.qual, cv);
+    }
+
+    public Column vis(ByteBuffer cv) {
+      return vis(Bytes.of(cv));
+    }
+
+    public Column vis(byte[] cv) {
+      return vis(Bytes.of(cv));
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class CQB extends QualifierMethods<VisibilityMethods> {
+    CQB(Data data) {
+      super(data);
+    }
+
+    @Override
+    VisibilityMethods create(Data data) {
+      return new VisibilityMethods(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class CFB extends SimpleFamilyMethods<CQB> {
+    CFB() {
+      super(new Data());
+    }
+
+    @Override
+    CQB create1(Data data) {
+      return new CQB(data);
+    }
+  }
+
+  public TypeLayer(Encoder encoder) {
+    this.encoder = encoder;
+  }
+
+  /**
+   * Initiates the chain of calls needed to build a column.
+   * 
+   * @return a column builder
+   */
+  public CFB bc() {
+    return new CFB();
+  }
+
+  public TypedSnapshot wrap(Snapshot snap) {
+    return new TypedSnapshot(snap, encoder, this);
+  }
+
+  public TypedTransactionBase wrap(TransactionBase tx) {
+    return new TypedTransactionBase(tx, encoder, this);
+  }
+
+  public TypedTransaction wrap(Transaction tx) {
+    return new TypedTransaction(tx, encoder, this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedLoader.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedLoader.java
new file mode 100644
index 0000000..be5625c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedLoader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.client.TransactionBase;
+
+/**
+ * A {@link Loader} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public abstract class TypedLoader implements Loader {
+
+  private final TypeLayer tl;
+
+  public TypedLoader() {
+    tl = new TypeLayer(new StringEncoder());
+  }
+
+  public TypedLoader(TypeLayer tl) {
+    this.tl = tl;
+  }
+
+  @Override
+  public void load(TransactionBase tx, Context context) throws Exception {
+    load(tl.wrap(tx), context);
+  }
+
+  public abstract void load(TypedTransactionBase tx, Context context) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedObserver.java
new file mode 100644
index 0000000..799ed50
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedObserver.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+
+/**
+ * An {@link AbstractObserver} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public abstract class TypedObserver extends AbstractObserver {
+
+  private final TypeLayer tl;
+
+  public TypedObserver() {
+    tl = new TypeLayer(new StringEncoder());
+  }
+
+  public TypedObserver(TypeLayer tl) {
+    this.tl = tl;
+  }
+
+  @Override
+  public void process(TransactionBase tx, Bytes row, Column col) {
+    process(tl.wrap(tx), row, col);
+  }
+
+  public abstract void process(TypedTransactionBase tx, Bytes row, Column col);
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshot.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshot.java
new file mode 100644
index 0000000..033d4de
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshot.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.client.Snapshot;
+
+/**
+ * A {@link Snapshot} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedSnapshot extends TypedSnapshotBase implements Snapshot {
+
+  private final Snapshot closeSnapshot;
+
+  TypedSnapshot(Snapshot snapshot, Encoder encoder, TypeLayer tl) {
+    super(snapshot, encoder, tl);
+    closeSnapshot = snapshot;
+  }
+
+  @Override
+  public void close() {
+    closeSnapshot.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshotBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshotBase.java
new file mode 100644
index 0000000..47722bd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshotBase.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.map.DefaultedMap;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.recipes.types.TypeLayer.Data;
+import org.apache.fluo.recipes.types.TypeLayer.FamilyMethods;
+import org.apache.fluo.recipes.types.TypeLayer.QualifierMethods;
+import org.apache.fluo.recipes.types.TypeLayer.RowMethods;
+
+// TODO need to refactor column to use Encoder
+
+/**
+ * A {@link SnapshotBase} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedSnapshotBase implements SnapshotBase {
+
+  private SnapshotBase snapshot;
+  private Encoder encoder;
+  private TypeLayer tl;
+
+  /**
+   * @since 1.0.0
+   */
+  public class VisibilityMethods extends Value {
+
+    public VisibilityMethods(Data data) {
+      super(data);
+    }
+
+    public Value vis(Bytes cv) {
+      data.vis = cv;
+      return new Value(data);
+    }
+
+    public Value vis(byte[] cv) {
+      data.vis = Bytes.of(cv);
+      return new Value(data);
+    }
+
+    public Value vis(ByteBuffer bb) {
+      data.vis = Bytes.of(bb);
+      return new Value(data);
+    }
+
+    public Value vis(String cv) {
+      data.vis = Bytes.of(cv);
+      return new Value(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class Value {
+    private Bytes bytes;
+    private boolean gotBytes = false;
+    protected Data data;
+
+    public Bytes getBytes() {
+      if (!gotBytes) {
+        try {
+          bytes = snapshot.get(data.row, data.getCol());
+          gotBytes = true;
+        } catch (Exception e) {
+          if (e instanceof RuntimeException) {
+            throw (RuntimeException) e;
+          }
+          throw new RuntimeException(e);
+        }
+      }
+
+      return bytes;
+    }
+
+    private Value(Bytes bytes) {
+      this.bytes = bytes;
+      this.gotBytes = true;
+    }
+
+    private Value(Data data) {
+      this.data = data;
+      this.gotBytes = false;
+    }
+
+    public Integer toInteger() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeInteger(getBytes());
+    }
+
+    public int toInteger(int defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeInteger(getBytes());
+    }
+
+    public Long toLong() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeLong(getBytes());
+    }
+
+    public long toLong(long defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeLong(getBytes());
+    }
+
+    @Override
+    public String toString() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeString(getBytes());
+    }
+
+    public String toString(String defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeString(getBytes());
+    }
+
+    public Float toFloat() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeFloat(getBytes());
+    }
+
+    public float toFloat(float defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeFloat(getBytes());
+    }
+
+    public Double toDouble() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeDouble(getBytes());
+    }
+
+    public double toDouble(double defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeDouble(getBytes());
+    }
+
+    public Boolean toBoolean() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeBoolean(getBytes());
+    }
+
+    public boolean toBoolean(boolean defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeBoolean(getBytes());
+    }
+
+    public byte[] toBytes() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return getBytes().toArray();
+    }
+
+    public byte[] toBytes(byte[] defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return getBytes().toArray();
+    }
+
+    public ByteBuffer toByteBuffer() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return ByteBuffer.wrap(getBytes().toArray());
+    }
+
+    public ByteBuffer toByteBuffer(ByteBuffer defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return toByteBuffer();
+    }
+
+    @Override
+    public int hashCode() {
+      if (getBytes() == null) {
+        return 0;
+      }
+
+      return getBytes().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof Value) {
+        Value ov = (Value) o;
+        if (getBytes() == null) {
+          return ov.getBytes() == null;
+        } else {
+          return getBytes().equals(ov.getBytes());
+        }
+      }
+
+      return false;
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class ValueQualifierBuilder extends QualifierMethods<VisibilityMethods> {
+
+    ValueQualifierBuilder(Data data) {
+      tl.super(data);
+    }
+
+    @Override
+    VisibilityMethods create(Data data) {
+      return new VisibilityMethods(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class ValueFamilyMethods extends FamilyMethods<ValueQualifierBuilder, Value> {
+
+    ValueFamilyMethods(Data data) {
+      tl.super(data);
+    }
+
+    @Override
+    ValueQualifierBuilder create1(Data data) {
+      return new ValueQualifierBuilder(data);
+    }
+
+    @Override
+    Value create2(Data data) {
+      return new Value(data);
+    }
+
+    public Map<Column, Value> columns(Set<Column> columns) {
+      try {
+        return wrap(snapshot.get(data.row, columns));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public Map<Column, Value> columns(Column... columns) {
+      try {
+        return wrap(snapshot.get(data.row, new HashSet<>(Arrays.asList(columns))));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class MapConverter {
+    private Collection<Bytes> rows;
+    private Set<Column> columns;
+
+    public MapConverter(Collection<Bytes> rows, Set<Column> columns) {
+      this.rows = rows;
+      this.columns = columns;
+    }
+
+    private Map<Bytes, Map<Column, Bytes>> getInput() {
+      try {
+        return snapshot.get(rows, columns);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private Map wrap2(Map m) {
+      return Collections.unmodifiableMap(DefaultedMap.decorate(m, new DefaultedMap(new Value(
+          (Bytes) null))));
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<String, Map<Column, Value>> toStringMap() {
+      Map<Bytes, Map<Column, Bytes>> in = getInput();
+      Map<String, Map<Column, Value>> out = new HashMap<>();
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+        out.put(encoder.decodeString(rowEntry.getKey()), wrap(rowEntry.getValue()));
+      }
+
+      return wrap2(out);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<Long, Map<Column, Value>> toLongMap() {
+      Map<Bytes, Map<Column, Bytes>> in = getInput();
+      Map<Long, Map<Column, Value>> out = new HashMap<>();
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+        out.put(encoder.decodeLong(rowEntry.getKey()), wrap(rowEntry.getValue()));
+      }
+
+      return wrap2(out);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<Integer, Map<Column, Value>> toIntegerMap() {
+      Map<Bytes, Map<Column, Bytes>> in = getInput();
+      Map<Integer, Map<Column, Value>> out = new HashMap<>();
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+        out.put(encoder.decodeInteger(rowEntry.getKey()), wrap(rowEntry.getValue()));
+      }
+
+      return wrap2(out);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<Bytes, Map<Column, Value>> toBytesMap() {
+      Map<Bytes, Map<Column, Bytes>> in = getInput();
+      Map<Bytes, Map<Column, Value>> out = new HashMap<>();
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+        out.put(rowEntry.getKey(), wrap(rowEntry.getValue()));
+      }
+
+      return wrap2(out);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class ColumnsMethods {
+    private Collection<Bytes> rows;
+
+    public ColumnsMethods(Collection<Bytes> rows) {
+      this.rows = rows;
+    }
+
+    public MapConverter columns(Set<Column> columns) {
+      return new MapConverter(rows, columns);
+    }
+
+    public MapConverter columns(Column... columns) {
+      return columns(new HashSet<>(Arrays.asList(columns)));
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class ValueRowMethods extends RowMethods<ValueFamilyMethods> {
+
+    ValueRowMethods() {
+      tl.super();
+    }
+
+    @Override
+    ValueFamilyMethods create(Data data) {
+      return new ValueFamilyMethods(data);
+    }
+
+    public ColumnsMethods rows(Collection<Bytes> rows) {
+      return new ColumnsMethods(rows);
+    }
+
+    public ColumnsMethods rows(Bytes... rows) {
+      return new ColumnsMethods(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsString(String... rows) {
+      return rowsString(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsString(Collection<String> rows) {
+      ArrayList<Bytes> conv = new ArrayList<>();
+      for (String row : rows) {
+        conv.add(encoder.encode(row));
+      }
+
+      return rows(conv);
+    }
+
+    public ColumnsMethods rowsLong(Long... rows) {
+      return rowsLong(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsLong(Collection<Long> rows) {
+      ArrayList<Bytes> conv = new ArrayList<>();
+      for (Long row : rows) {
+        conv.add(encoder.encode(row));
+      }
+
+      return rows(conv);
+    }
+
+    public ColumnsMethods rowsInteger(Integer... rows) {
+      return rowsInteger(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsInteger(Collection<Integer> rows) {
+      ArrayList<Bytes> conv = new ArrayList<>();
+      for (Integer row : rows) {
+        conv.add(encoder.encode(row));
+      }
+
+      return rows(conv);
+    }
+
+    public ColumnsMethods rowsBytes(byte[]... rows) {
+      return rowsBytes(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsBytes(Collection<byte[]> rows) {
+      ArrayList<Bytes> conv = new ArrayList<>();
+      for (byte[] row : rows) {
+        conv.add(Bytes.of(row));
+      }
+
+      return rows(conv);
+    }
+
+    public ColumnsMethods rowsByteBuffers(ByteBuffer... rows) {
+      return rowsByteBuffers(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsByteBuffers(Collection<ByteBuffer> rows) {
+      ArrayList<Bytes> conv = new ArrayList<>();
+      for (ByteBuffer row : rows) {
+        conv.add(Bytes.of(row));
+      }
+
+      return rows(conv);
+    }
+
+  }
+
+  TypedSnapshotBase(SnapshotBase snapshot, Encoder encoder, TypeLayer tl) {
+    this.snapshot = snapshot;
+    this.encoder = encoder;
+    this.tl = tl;
+  }
+
+  @Override
+  public Bytes get(Bytes row, Column column) {
+    return snapshot.get(row, column);
+  }
+
+  @Override
+  public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+    return snapshot.get(row, columns);
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+    return snapshot.get(rowColumns);
+  }
+
+  @Override
+  public RowIterator get(ScannerConfiguration config) {
+    return snapshot.get(config);
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+    return snapshot.get(rows, columns);
+  }
+
+  public ValueRowMethods get() {
+    return new ValueRowMethods();
+  }
+
+  @SuppressWarnings({"unchecked"})
+  private Map<Column, Value> wrap(Map<Column, Bytes> map) {
+    Map<Column, Value> ret = Maps.transformValues(map, new Function<Bytes, Value>() {
+      @Override
+      public Value apply(Bytes input) {
+        return new Value(input);
+      }
+    });
+
+    return Collections.unmodifiableMap(DefaultedMap.decorate(ret, new Value((Bytes) null)));
+  }
+
+  @Override
+  public long getStartTimestamp() {
+    return snapshot.getStartTimestamp();
+  }
+
+  @Override
+  public String gets(String row, Column column) {
+    return snapshot.gets(row, column);
+  }
+
+  @Override
+  public Map<Column, String> gets(String row, Set<Column> columns) {
+    return snapshot.gets(row, columns);
+  }
+
+  @Override
+  public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+    return snapshot.gets(rows, columns);
+  }
+
+  @Override
+  public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+    return snapshot.gets(rowColumns);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransaction.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransaction.java
new file mode 100644
index 0000000..1e22cd4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransaction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.exceptions.CommitException;
+
+/**
+ * A {@link Transaction} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedTransaction extends TypedTransactionBase implements Transaction {
+
+  private final Transaction closeTx;
+
+  @VisibleForTesting
+  protected TypedTransaction(Transaction tx, Encoder encoder, TypeLayer tl) {
+    super(tx, encoder, tl);
+    closeTx = tx;
+  }
+
+  @Override
+  public void commit() throws CommitException {
+    closeTx.commit();
+  }
+
+  @Override
+  public void close() {
+    closeTx.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransactionBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransactionBase.java
new file mode 100644
index 0000000..3247ba9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransactionBase.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.nio.ByteBuffer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+import org.apache.fluo.recipes.types.TypeLayer.Data;
+import org.apache.fluo.recipes.types.TypeLayer.FamilyMethods;
+import org.apache.fluo.recipes.types.TypeLayer.QualifierMethods;
+import org.apache.fluo.recipes.types.TypeLayer.RowMethods;
+
+/**
+ * A {@link TransactionBase} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedTransactionBase extends TypedSnapshotBase implements TransactionBase {
+
+  private final TransactionBase tx;
+  private final Encoder encoder;
+  private final TypeLayer tl;
+
+  /**
+   * @since 1.0.0
+   */
+  public class Mutator {
+
+    private boolean set = false;
+    protected Data data;
+
+    public Mutator(Data data) {
+      this.data = data;
+    }
+
+    void checkNotSet() {
+      if (set) {
+        throw new IllegalStateException("Already set value");
+      }
+    }
+
+    public void set(Bytes bytes) throws AlreadySetException {
+      checkNotSet();
+      tx.set(data.row, data.getCol(), bytes);
+      set = true;
+    }
+
+    public void set(String s) throws AlreadySetException {
+      set(encoder.encode(s));
+    }
+
+    public void set(int i) throws AlreadySetException {
+      set(encoder.encode(i));
+    }
+
+    public void set(long l) throws AlreadySetException {
+      set(encoder.encode(l));
+    }
+
+    public void set(float f) throws AlreadySetException {
+      set(encoder.encode(f));
+    }
+
+    public void set(double d) throws AlreadySetException {
+      set(encoder.encode(d));
+    }
+
+    public void set(boolean b) throws AlreadySetException {
+      set(encoder.encode(b));
+    }
+
+    public void set(byte[] ba) throws AlreadySetException {
+      set(Bytes.of(ba));
+    }
+
+    public void set(ByteBuffer bb) throws AlreadySetException {
+      set(Bytes.of(bb));
+    }
+
+    /**
+     * Set an empty value
+     */
+    public void set() throws AlreadySetException {
+      set(Bytes.EMPTY);
+    }
+
+    /**
+     * Reads the current value of the row/column, adds i, sets the sum. If the row/column does not
+     * have a current value, then it defaults to zero.
+     *
+     * @param i Integer increment amount
+     * @throws AlreadySetException if value was previously set in transaction
+     */
+    public void increment(int i) throws AlreadySetException {
+      checkNotSet();
+      Bytes val = tx.get(data.row, data.getCol());
+      int v = 0;
+      if (val != null) {
+        v = encoder.decodeInteger(val);
+      }
+      tx.set(data.row, data.getCol(), encoder.encode(v + i));
+    }
+
+    /**
+     * Reads the current value of the row/column, adds l, sets the sum. If the row/column does not
+     * have a current value, then it defaults to zero.
+     *
+     * @param l Long increment amount
+     * @throws AlreadySetException if value was previously set in transaction
+     */
+    public void increment(long l) throws AlreadySetException {
+      checkNotSet();
+      Bytes val = tx.get(data.row, data.getCol());
+      long v = 0;
+      if (val != null) {
+        v = encoder.decodeLong(val);
+      }
+      tx.set(data.row, data.getCol(), encoder.encode(v + l));
+    }
+
+    public void delete() throws AlreadySetException {
+      checkNotSet();
+      tx.delete(data.row, data.getCol());
+      set = true;
+    }
+
+    public void weaklyNotify() {
+      checkNotSet();
+      tx.setWeakNotification(data.row, data.getCol());
+      set = true;
+    }
+
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class VisibilityMutator extends Mutator {
+
+    public VisibilityMutator(Data data) {
+      super(data);
+    }
+
+    public Mutator vis(String cv) {
+      checkNotSet();
+      data.vis = Bytes.of(cv);
+      return new Mutator(data);
+    }
+
+    public Mutator vis(Bytes cv) {
+      checkNotSet();
+      data.vis = cv;
+      return new Mutator(data);
+    }
+
+    public Mutator vis(byte[] cv) {
+      checkNotSet();
+      data.vis = Bytes.of(cv);
+      return new Mutator(data);
+    }
+
+    public Mutator vis(ByteBuffer cv) {
+      checkNotSet();
+      data.vis = Bytes.of(cv);
+      return new Mutator(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class MutatorQualifierMethods extends QualifierMethods<VisibilityMutator> {
+
+    MutatorQualifierMethods(Data data) {
+      tl.super(data);
+    }
+
+    @Override
+    VisibilityMutator create(Data data) {
+      return new VisibilityMutator(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class MutatorFamilyMethods extends FamilyMethods<MutatorQualifierMethods, Mutator> {
+
+    MutatorFamilyMethods(Data data) {
+      tl.super(data);
+    }
+
+    @Override
+    MutatorQualifierMethods create1(Data data) {
+      return new MutatorQualifierMethods(data);
+    }
+
+    @Override
+    Mutator create2(Data data) {
+      return new Mutator(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class MutatorRowMethods extends RowMethods<MutatorFamilyMethods> {
+
+    MutatorRowMethods() {
+      tl.super();
+    }
+
+    @Override
+    MutatorFamilyMethods create(Data data) {
+      return new MutatorFamilyMethods(data);
+    }
+
+  }
+
+  @VisibleForTesting
+  protected TypedTransactionBase(TransactionBase tx, Encoder encoder, TypeLayer tl) {
+    super(tx, encoder, tl);
+    this.tx = tx;
+    this.encoder = encoder;
+    this.tl = tl;
+  }
+
+  public MutatorRowMethods mutate() {
+    return new MutatorRowMethods();
+  }
+
+  @Override
+  public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
+    tx.set(row, col, value);
+  }
+
+  @Override
+  public void set(String row, Column col, String value) throws AlreadySetException {
+    tx.set(row, col, value);
+  }
+
+  @Override
+  public void setWeakNotification(Bytes row, Column col) {
+    tx.setWeakNotification(row, col);
+  }
+
+  @Override
+  public void setWeakNotification(String row, Column col) {
+    tx.setWeakNotification(row, col);
+  }
+
+  @Override
+  public void delete(Bytes row, Column col) throws AlreadySetException {
+    tx.delete(row, col);
+  }
+
+  @Override
+  public void delete(String row, Column col) {
+    tx.delete(row, col);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshot.java b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshot.java
new file mode 100644
index 0000000..3cdc9ea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshot.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.client.Snapshot;
+
+public class MockSnapshot extends MockSnapshotBase implements Snapshot {
+
+  MockSnapshot(String... entries) {
+    super(entries);
+  }
+
+  @Override
+  public void close() {
+    // no resources need to be closed
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshotBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshotBase.java
new file mode 100644
index 0000000..93372dc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshotBase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.core.impl.TxStringUtil;
+
+public class MockSnapshotBase implements SnapshotBase {
+
+  final Map<Bytes, Map<Column, Bytes>> getData;
+
+  /**
+   * Initializes {@link #getData} using {@link #toRCVM(String...)}
+   */
+  MockSnapshotBase(String... entries) {
+    getData = toRCVM(entries);
+  }
+
+  @Override
+  public Bytes get(Bytes row, Column column) {
+    Map<Column, Bytes> cols = getData.get(row);
+    if (cols != null) {
+      return cols.get(column);
+    }
+
+    return null;
+  }
+
+  @Override
+  public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+    Map<Column, Bytes> ret = new HashMap<>();
+    Map<Column, Bytes> cols = getData.get(row);
+    if (cols != null) {
+      for (Column column : columns) {
+        Bytes val = cols.get(column);
+        if (val != null) {
+          ret.put(column, val);
+        }
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+
+    Map<Bytes, Map<Column, Bytes>> ret = new HashMap<>();
+
+    for (Bytes row : rows) {
+      Map<Column, Bytes> colMap = get(row, columns);
+      if (colMap != null && colMap.size() > 0) {
+        ret.put(row, colMap);
+      }
+    }
+
+    return ret;
+  }
+
+  @Override
+  public RowIterator get(ScannerConfiguration config) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * toRCVM stands for "To Row Column Value Map". This is a convenience function that takes strings
+   * of the format {@code <row>,<col fam>:<col qual>[:col vis],
+   * <value>} and generates a row, column, value map.
+   */
+  public static Map<Bytes, Map<Column, Bytes>> toRCVM(String... entries) {
+    Map<Bytes, Map<Column, Bytes>> ret = new HashMap<>();
+
+    for (String entry : entries) {
+      String[] rcv = entry.split(",");
+      if (rcv.length != 3 && !(rcv.length == 2 && entry.trim().endsWith(","))) {
+        throw new IllegalArgumentException(
+            "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+      }
+
+      Bytes row = Bytes.of(rcv[0]);
+      String[] colFields = rcv[1].split(":");
+
+      Column col;
+      if (colFields.length == 3) {
+        col = new Column(colFields[0], colFields[1], colFields[2]);
+      } else if (colFields.length == 2) {
+        col = new Column(colFields[0], colFields[1]);
+      } else {
+        throw new IllegalArgumentException(
+            "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+      }
+
+      Bytes val;
+      if (rcv.length == 2) {
+        val = Bytes.EMPTY;
+      } else {
+        val = Bytes.of(rcv[2]);
+      }
+
+      Map<Column, Bytes> cols = ret.get(row);
+      if (cols == null) {
+        cols = new HashMap<>();
+        ret.put(row, cols);
+      }
+
+      cols.put(col, val);
+    }
+    return ret;
+  }
+
+  /**
+   * toRCM stands for "To Row Column Map". This is a convenience function that takes strings of the
+   * format {@code <row>,<col fam>:<col qual>[:col vis]} and generates a row, column map.
+   */
+  public static Map<Bytes, Set<Column>> toRCM(String... entries) {
+    Map<Bytes, Set<Column>> ret = new HashMap<>();
+
+    for (String entry : entries) {
+      String[] rcv = entry.split(",");
+      if (rcv.length != 2) {
+        throw new IllegalArgumentException(
+            "expected <row>,<col fam>:<col qual>[:col vis] but saw : " + entry);
+      }
+
+      Bytes row = Bytes.of(rcv[0]);
+      String[] colFields = rcv[1].split(":");
+
+      Column col;
+      if (colFields.length == 3) {
+        col = new Column(colFields[0], colFields[1], colFields[2]);
+      } else if (colFields.length == 2) {
+        col = new Column(colFields[0], colFields[1]);
+      } else {
+        throw new IllegalArgumentException(
+            "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+      }
+
+      Set<Column> cols = ret.get(row);
+      if (cols == null) {
+        cols = new HashSet<>();
+        ret.put(row, cols);
+      }
+
+      cols.add(col);
+    }
+    return ret;
+  }
+
+  @Override
+  public long getStartTimestamp() {
+    throw new UnsupportedOperationException();
+  }
+
+
+  @Override
+  public String gets(String row, Column column) {
+    return TxStringUtil.gets(this, row, column);
+  }
+
+  @Override
+  public Map<Column, String> gets(String row, Set<Column> columns) {
+    return TxStringUtil.gets(this, row, columns);
+  }
+
+  @Override
+  public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+    return TxStringUtil.gets(this, rows, columns);
+  }
+
+  @Override
+  public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+    return TxStringUtil.gets(this, rowColumns);
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransaction.java b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransaction.java
new file mode 100644
index 0000000..f187234
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransaction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.exceptions.CommitException;
+
+public class MockTransaction extends MockTransactionBase implements Transaction {
+
+  MockTransaction(String... entries) {
+    super(entries);
+  }
+
+  @Override
+  public void commit() throws CommitException {
+    // does nothing
+  }
+
+  @Override
+  public void close() {
+    // no resources to close
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransactionBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransactionBase.java
new file mode 100644
index 0000000..07a95e9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransactionBase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+
+/**
+ * A very simple implementation of {@link TransactionBase} used for testing. All reads are serviced
+ * from {@link #getData}. Updates are stored in {@link #setData}, {@link #deletes}, or
+ * {@link #weakNotifications} depending on the update type.
+ */
+public class MockTransactionBase extends MockSnapshotBase implements TransactionBase {
+
+  final Map<Bytes, Map<Column, Bytes>> setData = new HashMap<>();
+  final Map<Bytes, Set<Column>> deletes = new HashMap<>();
+  final Map<Bytes, Set<Column>> weakNotifications = new HashMap<>();
+
+  MockTransactionBase(String... entries) {
+    super(entries);
+  }
+
+  @Override
+  public void setWeakNotification(Bytes row, Column col) {
+    Set<Column> cols = weakNotifications.get(row);
+    if (cols == null) {
+      cols = new HashSet<>();
+      weakNotifications.put(row, cols);
+    }
+
+    cols.add(col);
+  }
+
+  @Override
+  public void set(Bytes row, Column col, Bytes value) {
+    Map<Column, Bytes> cols = setData.get(row);
+    if (cols == null) {
+      cols = new HashMap<>();
+      setData.put(row, cols);
+    }
+
+    cols.put(col, value);
+  }
+
+  @Override
+  public void delete(Bytes row, Column col) {
+    Set<Column> cols = deletes.get(row);
+    if (cols == null) {
+      cols = new HashSet<>();
+      deletes.put(row, cols);
+    }
+
+    cols.add(col);
+  }
+
+  @Override
+  public void setWeakNotification(String row, Column col) {
+    setWeakNotification(Bytes.of(row), col);
+  }
+
+  @Override
+  public void set(String row, Column col, String value) throws AlreadySetException {
+    set(Bytes.of(row), col, Bytes.of(value));
+  }
+
+  @Override
+  public void delete(String row, Column col) {
+    delete(Bytes.of(row), col);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/777db493/modules/core/src/test/java/org/apache/fluo/recipes/types/TypeLayerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/types/TypeLayerTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/types/TypeLayerTest.java
new file mode 100644
index 0000000..1139481
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/types/TypeLayerTest.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.types.TypedSnapshotBase.Value;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TypeLayerTest {
+
+  @Test
+  public void testColumns() throws Exception {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockTransactionBase tt =
+        new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+            "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+    TypedTransactionBase ttx = tl.wrap(tt);
+
+    Map<Column, Value> results =
+        ttx.get().row("r2")
+            .columns(ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7")));
+
+    Assert.assertNull(results.get(new Column("cf2", "6")).toInteger());
+    Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0));
+    Assert.assertEquals(12, (int) results.get(new Column("cf2", "7")).toInteger());
+    Assert.assertEquals(12, results.get(new Column("cf2", "7")).toInteger(0));
+
+    Assert.assertEquals(1, results.size());
+
+    results =
+        ttx.get()
+            .row("r2")
+            .columns(
+                ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2",
+                    "8")));
+
+    Assert.assertNull(results.get(new Column("cf2", "6")).toInteger());
+    Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0));
+    Assert.assertEquals(12, (int) results.get(new Column("cf2", "7")).toInteger());
+    Assert.assertEquals(12, results.get(new Column("cf2", "7")).toInteger(0));
+    Assert.assertEquals(13, (int) results.get(new Column("cf2", "8")).toInteger());
+    Assert.assertEquals(13, results.get(new Column("cf2", "8")).toInteger(0));
+
+    Assert.assertEquals(2, results.size());
+
+    // test var args
+    Map<Column, Value> results2 =
+        ttx.get().row("r2")
+            .columns(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2", "8"));
+    Assert.assertEquals(results, results2);
+  }
+
+  @Test
+  public void testVis() throws Exception {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockTransactionBase tt = new MockTransactionBase("r1,cf1:cq1:A,v1", "r1,cf1:cq2:A&B,v2");
+
+    TypedTransactionBase ttx = tl.wrap(tt);
+
+    Assert.assertNull(ttx.get().row("r1").fam("cf1").qual("cq1").toString());
+    Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A").toString());
+    Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A".getBytes())
+        .toString());
+    Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A"))
+        .toString());
+    Assert.assertEquals("v1",
+        ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A".getBytes())).toString());
+
+    Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString());
+    Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes())
+        .toString());
+    Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B"))
+        .toString());
+    Assert.assertNull("v1",
+        ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes()))
+            .toString());
+
+    Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString("v3"));
+    Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes())
+        .toString("v3"));
+    Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B"))
+        .toString("v3"));
+    Assert.assertEquals(
+        "v3",
+        ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes()))
+            .toString("v3"));
+
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").set(3);
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).set(4);
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&D")).set(5);
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&F".getBytes())).set(7);
+
+    Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1:A&B,3", "r1,cf1:cq1:A&C,4",
+        "r1,cf1:cq1:A&D,5", "r1,cf1:cq1:A&F,7"), tt.setData);
+    tt.setData.clear();
+
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").delete();
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).delete();
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&D")).delete();
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&F".getBytes())).delete();
+
+    Assert.assertEquals(MockTransactionBase.toRCM("r1,cf1:cq1:A&B", "r1,cf1:cq1:A&C",
+        "r1,cf1:cq1:A&D", "r1,cf1:cq1:A&F"), tt.deletes);
+    tt.deletes.clear();
+    Assert.assertEquals(0, tt.setData.size());
+    Assert.assertEquals(0, tt.weakNotifications.size());
+
+  }
+
+  @Test
+  public void testBuildColumn() {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0".getBytes()).qual("q0".getBytes())
+        .vis());
+    Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0").qual("q0").vis());
+    Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5).qual(7).vis());
+    Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5l).qual(7l).vis());
+    Assert.assertEquals(new Column("5", "7"), tl.bc().fam(Bytes.of("5")).qual(Bytes.of("7")).vis());
+    Assert.assertEquals(new Column("5", "7"),
+        tl.bc().fam(ByteBuffer.wrap("5".getBytes())).qual(ByteBuffer.wrap("7".getBytes())).vis());
+
+    Assert.assertEquals(new Column("f0", "q0", "A&B"),
+        tl.bc().fam("f0".getBytes()).qual("q0".getBytes()).vis("A&B"));
+    Assert.assertEquals(new Column("f0", "q0", "A&C"),
+        tl.bc().fam("f0").qual("q0").vis("A&C".getBytes()));
+    Assert.assertEquals(new Column("5", "7", "A&D"), tl.bc().fam(5).qual(7).vis(Bytes.of("A&D")));
+    Assert.assertEquals(new Column("5", "7", "A&D"),
+        tl.bc().fam(5).qual(7).vis(ByteBuffer.wrap("A&D".getBytes())));
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockSnapshot ms =
+        new MockSnapshot("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+            "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20",
+            "r3,cf3:cq3,28.195", "r4,cf4:cq4,true");
+
+    TypedSnapshot tts = tl.wrap(ms);
+
+    Assert.assertEquals("v1", tts.get().row("r1").fam("cf1").qual("cq1").toString());
+    Assert.assertEquals("v1", tts.get().row("r1").fam("cf1").qual("cq1").toString("b"));
+    Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual("8").toString());
+    Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual("8").toString("b"));
+    Assert.assertEquals("28.195", tts.get().row("r3").fam("cf3").qual("cq3").toString());
+    Assert.assertEquals("28.195", tts.get().row("r3").fam("cf3").qual("cq3").toString("b"));
+    Assert.assertEquals("true", tts.get().row("r4").fam("cf4").qual("cq4").toString());
+    Assert.assertEquals("true", tts.get().row("r4").fam("cf4").qual("cq4").toString("b"));
+
+    // try converting to different types
+    Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual(8).toString());
+    Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual(8).toString("b"));
+    Assert.assertEquals((Integer) 13, tts.get().row("r2").fam("cf2").qual(8).toInteger());
+    Assert.assertEquals(13, tts.get().row("r2").fam("cf2").qual(8).toInteger(14));
+    Assert.assertEquals((Long) 13l, tts.get().row("r2").fam("cf2").qual(8).toLong());
+    Assert.assertEquals(13l, tts.get().row("r2").fam("cf2").qual(8).toLong(14l));
+    Assert.assertEquals("13", new String(tts.get().row("r2").fam("cf2").qual(8).toBytes()));
+    Assert.assertEquals("13",
+        new String(tts.get().row("r2").fam("cf2").qual(8).toBytes("14".getBytes())));
+    Assert
+        .assertEquals("13", new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes()));
+    Assert.assertEquals("13",
+        new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes("14".getBytes())));
+    Assert.assertEquals("13",
+        Bytes.of(tts.get().row("r2").col(new Column("cf2", "8")).toByteBuffer()).toString());
+    Assert.assertEquals(
+        "13",
+        Bytes.of(
+            tts.get().row("r2").col(new Column("cf2", "8"))
+                .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString());
+
+    // test non-existent
+    Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toInteger());
+    Assert.assertEquals(14, tts.get().row("r2").fam("cf3").qual(8).toInteger(14));
+    Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toLong());
+    Assert.assertEquals(14l, tts.get().row("r2").fam("cf3").qual(8).toLong(14l));
+    Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toString());
+    Assert.assertEquals("14", tts.get().row("r2").fam("cf3").qual(8).toString("14"));
+    Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toBytes());
+    Assert.assertEquals("14",
+        new String(tts.get().row("r2").fam("cf3").qual(8).toBytes("14".getBytes())));
+    Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toBytes());
+    Assert.assertEquals("14",
+        new String(tts.get().row("r2").col(new Column("cf3", "8")).toBytes("14".getBytes())));
+    Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toByteBuffer());
+    Assert.assertEquals(
+        "14",
+        Bytes.of(
+            tts.get().row("r2").col(new Column("cf3", "8"))
+                .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString());
+
+    // test float & double
+    Assert.assertEquals((Float) 28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat());
+    Assert.assertEquals(28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat(39.383f), 0.0);
+    Assert.assertEquals((Double) 28.195d, tts.get().row("r3").fam("cf3").qual("cq3").toDouble());
+    Assert.assertEquals(28.195d, tts.get().row("r3").fam("cf3").qual("cq3").toDouble(39.383d), 0.0);
+
+    // test boolean
+    Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean());
+    Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean());
+    Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean(false));
+    Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean(false));
+
+    // try different types for row
+    Assert.assertEquals("20", tts.get().row(13).fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row(13l).fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13".getBytes()).fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row(ByteBuffer.wrap("13".getBytes())).fam("9").qual("17")
+        .toString());
+
+    // try different types for cf
+    Assert.assertEquals("20", tts.get().row("13").fam(9).qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam(9l).qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9".getBytes()).qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam(ByteBuffer.wrap("9".getBytes())).qual("17")
+        .toString());
+
+    // try different types for cq
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17l).toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17).toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17".getBytes()).toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual(ByteBuffer.wrap("17".getBytes()))
+        .toString());
+
+    ms.close();
+    tts.close();
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockTransactionBase tt =
+        new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+            "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+    TypedTransactionBase ttx = tl.wrap(tt);
+
+    // test increments data
+    ttx.mutate().row("13").fam("9").qual("17").increment(1);
+    ttx.mutate().row("13").fam("9").qual(18).increment(2);
+    ttx.mutate().row("13").fam("9").qual(19l).increment(3);
+    ttx.mutate().row("13").fam("9").qual("20".getBytes()).increment(4);
+    ttx.mutate().row("13").fam("9").qual(Bytes.of("21")).increment(5); // increment non existent
+    ttx.mutate().row("13").col(new Column("9", "22")).increment(6); // increment non existent
+    ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7); // increment
+                                                                                         // non
+                                                                                         // existent
+
+    Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,22", "13,9:19,23",
+        "13,9:20,24", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData);
+    tt.setData.clear();
+
+    // test increments long
+    ttx.mutate().row("13").fam("9").qual("17").increment(1l);
+    ttx.mutate().row("13").fam("9").qual(18).increment(2l);
+    ttx.mutate().row("13").fam("9").qual(19l).increment(3l);
+    ttx.mutate().row("13").fam("9").qual("20".getBytes()).increment(4l);
+    ttx.mutate().row("13").fam("9").qual(Bytes.of("21")).increment(5l); // increment non existent
+    ttx.mutate().row("13").col(new Column("9", "22")).increment(6l); // increment non existent
+    ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7l); // increment
+                                                                                          // non
+                                                                                          // existent
+
+    Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,22", "13,9:19,23",
+        "13,9:20,24", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData);
+    tt.setData.clear();
+
+    // test setting data
+    ttx.mutate().row("13").fam("9").qual("16").set();
+    ttx.mutate().row("13").fam("9").qual("17").set(3);
+    ttx.mutate().row("13").fam("9").qual(18).set(4l);
+    ttx.mutate().row("13").fam("9").qual(19l).set("5");
+    ttx.mutate().row("13").fam("9").qual("20".getBytes()).set("6".getBytes());
+    ttx.mutate().row("13").col(new Column("9", "21")).set("7".getBytes());
+    ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes()))
+        .set(ByteBuffer.wrap("8".getBytes()));
+    ttx.mutate().row("13").fam("9").qual("23").set(2.54f);
+    ttx.mutate().row("13").fam("9").qual("24").set(-6.135d);
+    ttx.mutate().row("13").fam("9").qual("25").set(false);
+
+    Assert.assertEquals(MockTransactionBase.toRCVM("13,9:16,", "13,9:17,3", "13,9:18,4",
+        "13,9:19,5", "13,9:20,6", "13,9:21,7", "13,9:22,8", "13,9:23,2.54", "13,9:24,-6.135",
+        "13,9:25,false"), tt.setData);
+    tt.setData.clear();
+
+    // test deleting data
+    ttx.mutate().row("13").fam("9").qual("17").delete();
+    ttx.mutate().row("13").fam("9").qual(18).delete();
+    ttx.mutate().row("13").fam("9").qual(19l).delete();
+    ttx.mutate().row("13").fam("9").qual("20".getBytes()).delete();
+    ttx.mutate().row("13").col(new Column("9", "21")).delete();
+    ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).delete();
+
+    Assert
+        .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20",
+            "13,9:21", "13,9:22"), tt.deletes);
+    tt.deletes.clear();
+    Assert.assertEquals(0, tt.setData.size());
+    Assert.assertEquals(0, tt.weakNotifications.size());
+
+    // test weak notifications
+    ttx.mutate().row("13").fam("9").qual("17").weaklyNotify();
+    ttx.mutate().row("13").fam("9").qual(18).weaklyNotify();
+    ttx.mutate().row("13").fam("9").qual(19l).weaklyNotify();
+    ttx.mutate().row("13").fam("9").qual("20".getBytes()).weaklyNotify();
+    ttx.mutate().row("13").col(new Column("9", "21")).weaklyNotify();
+    ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).weaklyNotify();
+
+    Assert
+        .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20",
+            "13,9:21", "13,9:22"), tt.weakNotifications);
+    tt.weakNotifications.clear();
+    Assert.assertEquals(0, tt.setData.size());
+    Assert.assertEquals(0, tt.deletes.size());
+  }
+
+  @Test
+  public void testMultiRow() throws Exception {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockTransactionBase tt =
+        new MockTransactionBase("11,cf1:cq1,1", "11,cf1:cq2,2", "12,cf1:cq1,3", "12,cf1:cq2,4",
+            "13,cf1:cq1,5", "13,cf1:cq2,6");
+
+    TypedTransactionBase ttx = tl.wrap(tt);
+
+    Bytes br1 = Bytes.of("11");
+    Bytes br2 = Bytes.of("12");
+    Bytes br3 = Bytes.of("13");
+
+    Column c1 = new Column("cf1", "cq1");
+    Column c2 = new Column("cf1", "cq2");
+
+    Map<Bytes, Map<Column, Value>> map1 =
+        ttx.get().rows(Arrays.asList(br1, br2)).columns(c1).toBytesMap();
+
+    Assert.assertEquals(map1, ttx.get().rows(br1, br2).columns(c1).toBytesMap());
+
+    Assert.assertEquals("1", map1.get(br1).get(c1).toString());
+    Assert.assertEquals("1", map1.get(br1).get(c1).toString("5"));
+    Assert.assertEquals((Long) (1l), map1.get(br1).get(c1).toLong());
+    Assert.assertEquals(1l, map1.get(br1).get(c1).toLong(5));
+    Assert.assertEquals((Integer) (1), map1.get(br1).get(c1).toInteger());
+    Assert.assertEquals(1, map1.get(br1).get(c1).toInteger(5));
+
+    Assert.assertEquals("5", map1.get(br3).get(c1).toString("5"));
+    Assert.assertNull(map1.get(br3).get(c1).toString());
+    Assert.assertEquals(5l, map1.get(br3).get(c1).toLong(5l));
+    Assert.assertNull(map1.get(br3).get(c1).toLong());
+    Assert.assertEquals(5, map1.get(br1).get(c2).toInteger(5));
+    Assert.assertNull(map1.get(br1).get(c2).toInteger());
+
+    Assert.assertEquals(2, map1.size());
+    Assert.assertEquals(1, map1.get(br1).size());
+    Assert.assertEquals(1, map1.get(br2).size());
+    Assert.assertEquals("3", map1.get(br2).get(c1).toString());
+
+    Map<String, Map<Column, Value>> map2 =
+        ttx.get().rowsString(Arrays.asList("11", "13")).columns(c1).toStringMap();
+
+    Assert.assertEquals(map2, ttx.get().rowsString("11", "13").columns(c1).toStringMap());
+
+    Assert.assertEquals(2, map2.size());
+    Assert.assertEquals(1, map2.get("11").size());
+    Assert.assertEquals(1, map2.get("13").size());
+    Assert.assertEquals((Long) (1l), map2.get("11").get(c1).toLong());
+    Assert.assertEquals(5l, map2.get("13").get(c1).toLong(6));
+
+    Map<Long, Map<Column, Value>> map3 =
+        ttx.get().rowsLong(Arrays.asList(11l, 13l)).columns(c1).toLongMap();
+
+    Assert.assertEquals(map3, ttx.get().rowsLong(11l, 13l).columns(c1).toLongMap());
+
+    Assert.assertEquals(2, map3.size());
+    Assert.assertEquals(1, map3.get(11l).size());
+    Assert.assertEquals(1, map3.get(13l).size());
+    Assert.assertEquals((Long) (1l), map3.get(11l).get(c1).toLong());
+    Assert.assertEquals(5l, map3.get(13l).get(c1).toLong(6));
+
+    Map<Integer, Map<Column, Value>> map4 =
+        ttx.get().rowsInteger(Arrays.asList(11, 13)).columns(c1).toIntegerMap();
+
+    Assert.assertEquals(map4, ttx.get().rowsInteger(11, 13).columns(c1).toIntegerMap());
+
+    Assert.assertEquals(2, map4.size());
+    Assert.assertEquals(1, map4.get(11).size());
+    Assert.assertEquals(1, map4.get(13).size());
+    Assert.assertEquals((Long) (1l), map4.get(11).get(c1).toLong());
+    Assert.assertEquals(5l, map4.get(13).get(c1).toLong(6));
+
+    Map<Integer, Map<Column, Value>> map5 =
+        ttx.get().rowsBytes(Arrays.asList("11".getBytes(), "13".getBytes())).columns(c1)
+            .toIntegerMap();
+
+    Assert.assertEquals(map5, ttx.get().rowsBytes("11".getBytes(), "13".getBytes()).columns(c1)
+        .toIntegerMap());
+
+    Assert.assertEquals(2, map5.size());
+    Assert.assertEquals(1, map5.get(11).size());
+    Assert.assertEquals(1, map5.get(13).size());
+    Assert.assertEquals((Long) (1l), map5.get(11).get(c1).toLong());
+    Assert.assertEquals(5l, map5.get(13).get(c1).toLong(6));
+
+    Map<Integer, Map<Column, Value>> map6 =
+        ttx.get()
+            .rowsByteBuffers(
+                Arrays.asList(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes())))
+            .columns(c1).toIntegerMap();
+
+    Assert.assertEquals(
+        map6,
+        ttx.get()
+            .rowsByteBuffers(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes()))
+            .columns(c1).toIntegerMap());
+
+    Assert.assertEquals(2, map6.size());
+    Assert.assertEquals(1, map6.get(11).size());
+    Assert.assertEquals(1, map6.get(13).size());
+    Assert.assertEquals((Long) (1l), map6.get(11).get(c1).toLong());
+    Assert.assertEquals(5l, map6.get(13).get(c1).toLong(6));
+
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockTransactionBase tt =
+        new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+            "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+    TypedTransactionBase ttx = tl.wrap(tt);
+
+    Assert.assertEquals(Bytes.of("12"), ttx.get(Bytes.of("r2"), new Column("cf2", "7")));
+    Assert.assertNull(ttx.get(Bytes.of("r2"), new Column("cf2", "9")));
+
+    Map<Column, Bytes> map =
+        ttx.get(Bytes.of("r2"), ImmutableSet.of(new Column("cf2", "7"), new Column("cf2", "8")));
+    Assert.assertEquals(2, map.size());
+    Assert.assertEquals("12", map.get(new Column("cf2", "7")).toString());
+    Assert.assertEquals("13", map.get(new Column("cf2", "8")).toString());
+
+    map = ttx.get(Bytes.of("r6"), ImmutableSet.of(new Column("cf2", "7"), new Column("cf2", "8")));
+    Assert.assertEquals(0, map.size());
+
+    ttx.set(Bytes.of("r6"), new Column("cf2", "7"), Bytes.of("3"));
+    Assert.assertEquals(MockTransactionBase.toRCVM("r6,cf2:7,3"), tt.setData);
+    tt.setData.clear();
+
+    Map<Bytes, Map<Column, Bytes>> map2 =
+        ttx.get(ImmutableSet.of(Bytes.of("r1"), Bytes.of("r2")),
+            ImmutableSet.of(new Column("cf1", "cq1"), new Column("cf2", "8")));
+    Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1,v1", "r2,cf2:8,13"), map2);
+
+    ttx.delete(Bytes.of("r6"), new Column("cf2", "7"));
+    Assert.assertEquals(MockTransactionBase.toRCM("r6,cf2:7"), tt.deletes);
+    tt.deletes.clear();
+
+    ttx.setWeakNotification(Bytes.of("r6"), new Column("cf2", "8"));
+    Assert.assertEquals(MockTransactionBase.toRCM("r6,cf2:8"), tt.weakNotifications);
+    tt.weakNotifications.clear();
+
+  }
+}



Mime
View raw message