fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [2/2] incubator-fluo-recipes git commit: Updated to use new scanner API from apache/incubator-fluo#639
Date Mon, 18 Jul 2016 18:00:49 GMT
Updated to use new scanner API from apache/incubator-fluo#639


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/18933f22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/18933f22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/18933f22

Branch: refs/heads/master
Commit: 18933f226b4b4adfc3e1e5487927b148583ce176
Parents: 22354d0
Author: Keith Turner <kturner@apache.org>
Authored: Fri Jul 15 15:59:21 2016 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Fri Jul 15 18:59:15 2016 -0400

----------------------------------------------------------------------
 .../fluo/recipes/core/export/ExportBucket.java  |  35 ++--
 .../fluo/recipes/core/map/CollisionFreeMap.java |  33 ++-
 .../transaction/RecordingTransactionBase.java   | 209 +++++++++++++++----
 .../recipes/core/types/TypedSnapshotBase.java   |  13 +-
 .../recipes/core/export/ExportTestBase.java     |  41 ++--
 .../fluo/recipes/core/map/BigUpdateIT.java      |  27 +--
 .../recipes/core/map/CollisionFreeMapIT.java    |  39 ++--
 .../transaction/RecordingTransactionTest.java   | 113 +++++-----
 .../recipes/core/types/MockSnapshotBase.java    |   5 +-
 .../apache/fluo/recipes/test/FluoITHelper.java  |  73 +++----
 pom.xml                                         |   2 +-
 11 files changed, 325 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
index cf6dfb4..c0dae48 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
@@ -15,19 +15,16 @@
 
 package org.apache.fluo.recipes.core.export;
 
-import java.util.Collections;
 import java.util.Iterator;
-import java.util.Map.Entry;
 
 import com.google.common.base.Preconditions;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.recipes.core.impl.BucketUtil;
 import org.apache.fluo.recipes.core.types.StringEncoder;
 import org.apache.fluo.recipes.core.types.TypeLayer;
@@ -123,35 +120,29 @@ class ExportBucket {
   }
 
   public Iterator<ExportEntry> getExportIterator(Bytes continueRow) {
-    ScannerConfiguration sc = new ScannerConfiguration();
-
+    Span span;
     if (continueRow != null) {
       Span tmpSpan = Span.prefix(bucketRow);
       Span nextSpan =
           new Span(new RowColumn(continueRow, EXPORT_COL), true, tmpSpan.getEnd(),
               tmpSpan.isEndInclusive());
-      sc.setSpan(nextSpan);
+      span = nextSpan;
     } else {
-      sc.setSpan(Span.prefix(bucketRow));
+      span = Span.prefix(bucketRow);
     }
 
-    sc.fetchColumn(EXPORT_COL.getFamily(), EXPORT_COL.getQualifier());
-    RowIterator iter = ttx.get(sc);
+    CellScanner scanner = ttx.scanner().over(span).fetch(EXPORT_COL).build();
 
-    if (iter.hasNext()) {
-      return new ExportIterator(iter);
-    } else {
-      return Collections.<ExportEntry>emptySet().iterator();
-    }
+    return new ExportIterator(scanner);
   }
 
   private class ExportIterator implements Iterator<ExportEntry> {
 
-    private RowIterator rowIter;
+    private Iterator<RowColumnValue> rowIter;
     private Bytes lastRow;
 
-    public ExportIterator(RowIterator rowIter) {
-      this.rowIter = rowIter;
+    public ExportIterator(CellScanner scanner) {
+      this.rowIter = scanner.iterator();
     }
 
     @Override
@@ -161,8 +152,8 @@ class ExportBucket {
 
     @Override
     public ExportEntry next() {
-      Entry<Bytes, ColumnIterator> rowCol = rowIter.next();
-      Bytes row = rowCol.getKey();
+      RowColumnValue rowColVal = rowIter.next();
+      Bytes row = rowColVal.getRow();
 
       Bytes keyBytes = row.subSequence(bucketRow.length() + 1, row.length() - 8);
       Bytes seqBytes = row.subSequence(row.length() - 8, row.length());
@@ -172,7 +163,7 @@ class ExportBucket {
       ee.key = keyBytes.toArray();
       ee.seq = decodeSeq(seqBytes);
       // TODO maybe leave as Bytes?
-      ee.value = rowCol.getValue().next().getValue().toArray();
+      ee.value = rowColVal.getValue().toArray();
 
       lastRow = row;
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
index 2fe4a7c..c6c0918 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
@@ -37,7 +37,6 @@ import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.BytesBuilder;
@@ -45,8 +44,6 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.recipes.core.common.TableOptimizations;
 import org.apache.fluo.recipes.core.common.RowRange;
 import org.apache.fluo.recipes.core.common.TransientRegistry;
@@ -113,7 +110,7 @@ public class CollisionFreeMap<K, V> {
 
     Bytes nextKey = tx.get(ntfyRow, NEXT_COL);
 
-    ScannerConfiguration sc = new ScannerConfiguration();
+    Span span;
 
     if (nextKey != null) {
       Bytes startRow =
@@ -123,14 +120,14 @@ public class CollisionFreeMap<K, V> {
       Span nextSpan =
           new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(),
               tmpSpan.isEndInclusive());
-      sc.setSpan(nextSpan);
+      span = nextSpan;
     } else {
-      sc.setSpan(Span.prefix(ntfyRow));
+      span = Span.prefix(ntfyRow);
     }
 
-    sc.setSpan(Span.prefix(ntfyRow));
-    sc.fetchColumn(UPDATE_COL.getFamily(), UPDATE_COL.getQualifier());
-    RowIterator iter = tx.get(sc);
+    // TODO
+    span = Span.prefix(ntfyRow);
+    Iterator<RowColumnValue> iter = tx.scanner().over(span).fetch(UPDATE_COL).build().iterator();
 
     Map<Bytes, List<Bytes>> updates = new HashMap<>();
 
@@ -141,8 +138,8 @@ public class CollisionFreeMap<K, V> {
     if (iter.hasNext()) {
       Bytes lastKey = null;
       while (iter.hasNext() && approxMemUsed < bufferSize) {
-        Entry<Bytes, ColumnIterator> rowCol = iter.next();
-        Bytes curRow = rowCol.getKey();
+        RowColumnValue rcv = iter.next();
+        Bytes curRow = rcv.getRow();
 
         tx.delete(curRow, UPDATE_COL);
 
@@ -155,7 +152,7 @@ public class CollisionFreeMap<K, V> {
           updates.put(serializedKey, updateList);
         }
 
-        Bytes val = rowCol.getValue().next().getValue();
+        Bytes val = rcv.getValue();
         updateList.add(val);
 
         approxMemUsed += curRow.length();
@@ -163,8 +160,8 @@ public class CollisionFreeMap<K, V> {
       }
 
       if (iter.hasNext()) {
-        Entry<Bytes, ColumnIterator> rowCol = iter.next();
-        Bytes curRow = rowCol.getKey();
+        RowColumnValue rcv = iter.next();
+        Bytes curRow = rcv.getRow();
 
         // check if more updates for last key
         if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) {
@@ -293,15 +290,13 @@ public class CollisionFreeMap<K, V> {
     BytesBuilder rowBuilder = Bytes.newBuilder();
     rowBuilder.append(mapId).append(":u:").append(bucketId).append(":").append(k);
 
-    ScannerConfiguration sc = new ScannerConfiguration();
-    sc.setSpan(Span.prefix(rowBuilder.toBytes()));
-
-    RowIterator iter = tx.get(sc);
+    Iterator<RowColumnValue> iter =
+        tx.scanner().over(Span.prefix(rowBuilder.toBytes())).build().iterator();
 
     Iterator<V> ui;
 
     if (iter.hasNext()) {
-      ui = Iterators.transform(iter, e -> deserVal(e.getValue().next().getValue()));
+      ui = Iterators.transform(iter, rcv -> deserVal(rcv.getValue()));
     } else {
       ui = Collections.<V>emptyList().iterator();
     }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
index a29cf88..e3b80b7 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
@@ -15,20 +15,26 @@
 
 package org.apache.fluo.recipes.core.transaction;
 
-import java.util.AbstractMap;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Predicate;
 
+import com.google.common.collect.Iterators;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.client.scanner.RowScannerBuilder;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.AlreadySetException;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
 
 /**
  * An implementation of {@link TransactionBase} that logs all transactions operations (GET,
SET, or
@@ -136,50 +142,167 @@ public class RecordingTransactionBase implements TransactionBase {
     return rowColVal;
   }
 
+  private class RtxIterator implements Iterator<RowColumnValue> {
+
+    private Iterator<RowColumnValue> iter;
+
+    public RtxIterator(Iterator<RowColumnValue> iterator) {
+      this.iter = iterator;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext();
+    }
+
+    @Override
+    public RowColumnValue next() {
+      RowColumnValue rcv = iter.next();
+      txLog.filteredAdd(LogEntry.newGet(rcv.getRow(), rcv.getColumn(), rcv.getValue()), filter);
+      return rcv;
+    }
+
+  }
+
+  private class RtxCellSanner implements CellScanner {
+
+    private CellScanner scanner;
+
+    public RtxCellSanner(CellScanner scanner) {
+      this.scanner = scanner;
+    }
+
+    @Override
+    public Iterator<RowColumnValue> iterator() {
+      return new RtxIterator(scanner.iterator());
+    }
+
+  }
+
+  private class RtxCVIterator implements Iterator<ColumnValue> {
+
+    private Iterator<ColumnValue> iter;
+    private Bytes row;
+
+    public RtxCVIterator(Bytes row, Iterator<ColumnValue> iterator) {
+      this.row = row;
+      this.iter = iterator;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext();
+    }
+
+    @Override
+    public ColumnValue next() {
+      ColumnValue cv = iter.next();
+      txLog.filteredAdd(LogEntry.newGet(row, cv.getColumn(), cv.getValue()), filter);
+      return cv;
+    }
+
+  }
+
+  private class RtxColumnScanner implements ColumnScanner {
+
+    private ColumnScanner cs;
+
+    public RtxColumnScanner(ColumnScanner cs) {
+      this.cs = cs;
+    }
+
+    @Override
+    public Iterator<ColumnValue> iterator() {
+      return new RtxCVIterator(cs.getRow(), cs.iterator());
+    }
+
+    @Override
+    public Bytes getRow() {
+      return cs.getRow();
+    }
+
+    @Override
+    public String getsRow() {
+      return cs.getsRow();
+    }
+
+  }
+
+  private class RtxRowScanner implements RowScanner {
+
+    private RowScanner scanner;
+
+    public RtxRowScanner(RowScanner scanner) {
+      this.scanner = scanner;
+    }
+
+    @Override
+    public Iterator<ColumnScanner> iterator() {
+      return Iterators.transform(scanner.iterator(), cs -> new RtxColumnScanner(cs));
+    }
+
+  }
+
+  private class RtxRowScannerBuilder implements RowScannerBuilder {
+
+    private RowScannerBuilder rsb;
+
+    public RtxRowScannerBuilder(RowScannerBuilder rsb) {
+      this.rsb = rsb;
+    }
+
+    @Override
+    public RowScanner build() {
+      return new RtxRowScanner(rsb.build());
+    }
+
+  }
+
+  private class RtxScannerBuilder implements ScannerBuilder {
+
+    private ScannerBuilder sb;
+
+    public RtxScannerBuilder(ScannerBuilder sb) {
+      this.sb = sb;
+    }
+
+    @Override
+    public ScannerBuilder over(Span span) {
+      sb = sb.over(span);
+      return this;
+    }
+
+    @Override
+    public ScannerBuilder fetch(Column... columns) {
+      sb = sb.fetch(columns);
+      return this;
+    }
+
+    @Override
+    public ScannerBuilder fetch(Collection<Column> columns) {
+      sb = sb.fetch(columns);
+      return this;
+    }
+
+    @Override
+    public CellScanner build() {
+      return new RtxCellSanner(sb.build());
+    }
+
+    @Override
+    public RowScannerBuilder byRow() {
+      return new RtxRowScannerBuilder(sb.byRow());
+    }
+
+  }
+
   /**
    * Logs GETs for Row/Columns returned by iterators. Requests that return no data will not
be
    * logged.
    */
   @Override
-  public RowIterator get(ScannerConfiguration config) {
-    final RowIterator rowIter = txb.get(config);
-    if (rowIter != null) {
-      return new RowIterator() {
-
-        @Override
-        public boolean hasNext() {
-          return rowIter.hasNext();
-        }
-
-        @Override
-        public Map.Entry<Bytes, ColumnIterator> next() {
-          final Map.Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
-          if ((rowEntry != null) && (rowEntry.getValue() != null)) {
-            final ColumnIterator colIter = rowEntry.getValue();
-            return new AbstractMap.SimpleEntry<>(rowEntry.getKey(), new ColumnIterator()
{
-
-              @Override
-              public boolean hasNext() {
-                return colIter.hasNext();
-              }
-
-              @Override
-              public Map.Entry<Column, Bytes> next() {
-                Map.Entry<Column, Bytes> colEntry = colIter.next();
-                if (colEntry != null) {
-                  txLog.filteredAdd(
-                      LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()),
-                      filter);
-                }
-                return colEntry;
-              }
-            });
-          }
-          return rowEntry;
-        }
-      };
-    }
-    return rowIter;
+  public ScannerBuilder scanner() {
+    return new RtxScannerBuilder(txb.scanner());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
index 7764e67..1f5eb48 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
@@ -29,11 +29,10 @@ import java.util.Set;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.map.DefaultedMap;
 import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.recipes.core.types.TypeLayer.Data;
 import org.apache.fluo.recipes.core.types.TypeLayer.FamilyMethods;
 import org.apache.fluo.recipes.core.types.TypeLayer.QualifierMethods;
@@ -509,11 +508,6 @@ public class TypedSnapshotBase implements SnapshotBase {
   }
 
   @Override
-  public RowIterator get(ScannerConfiguration config) {
-    return snapshot.get(config);
-  }
-
-  @Override
   public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column>
columns) {
     return snapshot.get(rows, columns);
   }
@@ -522,6 +516,11 @@ public class TypedSnapshotBase implements SnapshotBase {
     return new ValueRowMethods();
   }
 
+  @Override
+  public ScannerBuilder scanner() {
+    return snapshot.scanner();
+  }
+
   @SuppressWarnings({"unchecked"})
   private Map<Column, Value> wrap(Map<Column, Bytes> map) {
     Map<Column, Value> ret = Maps.transformValues(map, input -> new Value(input));

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
index c1cf3ce..4493fdf 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 
@@ -33,14 +32,14 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.LoaderExecutor;
 import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.api.mini.MiniFluo;
 import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
 import org.junit.After;
@@ -240,20 +239,15 @@ public class ExportTestBase {
     Map<String, Set<String>> fluoReferees = new HashMap<>();
 
     try (Snapshot snap = fc.newSnapshot()) {
-      ScannerConfiguration scannerConfig = new ScannerConfiguration();
-      scannerConfig.fetchColumn(Bytes.of("content"), Bytes.of("current"));
-      scannerConfig.setSpan(Span.prefix("d:"));
-      RowIterator scanner = snap.get(scannerConfig);
-      while (scanner.hasNext()) {
-        Entry<Bytes, ColumnIterator> row = scanner.next();
-        ColumnIterator colIter = row.getValue();
 
-        String docid = row.getKey().toString().substring(2);
+      Column currCol = new Column("content", "current");
+      RowScanner rowScanner = snap.scanner().over(Span.prefix("d:")).fetch(currCol).byRow().build();
 
-        while (colIter.hasNext()) {
-          Entry<Column, Bytes> entry = colIter.next();
+      for (ColumnScanner columnScanner : rowScanner) {
+        String docid = columnScanner.getsRow().substring(2);
 
-          String[] refs = entry.getValue().toString().split(" ");
+        for (ColumnValue columnValue : columnScanner) {
+          String[] refs = columnValue.getsValue().split(" ");
 
           for (String ref : refs) {
             if (ref.isEmpty())
@@ -269,18 +263,9 @@ public class ExportTestBase {
 
   public static void dump(FluoClient fc) {
     try (Snapshot snap = fc.newSnapshot()) {
-      RowIterator scanner = snap.get(new ScannerConfiguration());
-      while (scanner.hasNext()) {
-        Entry<Bytes, ColumnIterator> row = scanner.next();
-        ColumnIterator colIter = row.getValue();
-
-        while (colIter.hasNext()) {
-          Entry<Column, Bytes> entry = colIter.next();
-
-          System.out.println("row:[" + row.getKey() + "]  col:[" + entry.getKey() + "]  val:["
-              + entry.getValue() + "]");
-        }
-      }
+      CellScanner scanner = snap.scanner().build();
+      scanner.forEach(rcv -> System.out.println("row:[" + rcv.getRow() + "]  col:["
+          + rcv.getColumn() + "]  val:[" + rcv.getValue() + "]"));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
index e5f7d55..852d117 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
@@ -19,7 +19,6 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -31,13 +30,12 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.api.mini.MiniFluo;
 import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
 import org.apache.fluo.recipes.core.types.StringEncoder;
@@ -179,21 +177,18 @@ public class BigUpdateIT {
   }
 
   private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) {
-    RowIterator iter = snap.get(new ScannerConfiguration().setSpan(Span.prefix("side:")));
 
-    int row = 0;
+    RowScanner rows = snap.scanner().over(Span.prefix("side:")).byRow().build();
 
-    while (iter.hasNext()) {
-      Entry<Bytes, ColumnIterator> entry = iter.next();
+    int row = 0;
 
-      Assert.assertEquals(String.format("side:%06d", row++), entry.getKey().toString());
+    for (ColumnScanner columns : rows) {
+      Assert.assertEquals(String.format("side:%06d", row++), columns.getsRow());
 
-      ColumnIterator colITer = entry.getValue();
-      while (colITer.hasNext()) {
-        Entry<Column, Bytes> entry2 = colITer.next();
-        Assert.assertEquals(new Column("debug", "sum"), entry2.getKey());
-        Assert.assertEquals("row : " + entry.getKey(), "" + expectedVal, entry2.getValue()
-            .toString());
+      for (ColumnValue columnValue : columns) {
+        Assert.assertEquals(new Column("debug", "sum"), columnValue.getColumn());
+        Assert
+            .assertEquals("row : " + columns.getsRow(), "" + expectedVal, columnValue.getsValue());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
index f7dbc89..cb76891 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
@@ -18,7 +18,6 @@ package org.apache.fluo.recipes.core.map;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Random;
 
 import com.google.common.collect.ImmutableMap;
@@ -28,14 +27,12 @@ import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.LoaderExecutor;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.api.mini.MiniFluo;
 import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
 import org.junit.After;
@@ -83,11 +80,11 @@ public class CollisionFreeMapIT {
     Map<String, Long> counts = new HashMap<>();
 
     try (Snapshot snap = fc.newSnapshot()) {
-      RowIterator scanner = snap.get(new ScannerConfiguration().setSpan(Span.prefix("iwc:")));
-      while (scanner.hasNext()) {
-        Entry<Bytes, ColumnIterator> row = scanner.next();
 
-        String[] tokens = row.getKey().toString().split(":");
+      CellScanner scanner = snap.scanner().over(Span.prefix("iwc:")).build();
+
+      for (RowColumnValue rcv : scanner) {
+        String[] tokens = rcv.getsRow().split(":");
         String word = tokens[2];
         Long count = Long.valueOf(tokens[1]);
 
@@ -104,25 +101,19 @@ public class CollisionFreeMapIT {
     Map<String, Long> counts = new HashMap<>();
 
     try (Snapshot snap = fc.newSnapshot()) {
-      RowIterator scanner =
-          snap.get(new ScannerConfiguration().setSpan(Span.prefix("d:")).fetchColumn(
-              Bytes.of("content"), Bytes.of("current")));
-      while (scanner.hasNext()) {
-        Entry<Bytes, ColumnIterator> row = scanner.next();
-
-        ColumnIterator colIter = row.getValue();
 
-        while (colIter.hasNext()) {
-          Entry<Column, Bytes> entry = colIter.next();
 
-          String[] words = entry.getValue().toString().split("\\s+");
-          for (String word : words) {
-            if (word.isEmpty()) {
-              continue;
-            }
+      CellScanner scanner =
+          snap.scanner().over(Span.prefix("d:")).fetch(new Column("content", "current")).build();
 
-            counts.merge(word, 1L, Long::sum);
+      for (RowColumnValue rcv : scanner) {
+        String[] words = rcv.getsValue().split("\\s+");
+        for (String word : words) {
+          if (word.isEmpty()) {
+            continue;
           }
+
+          counts.merge(word, 1L, Long::sum);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
b/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
index 7c09b6e..1e9b262 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
@@ -15,17 +15,21 @@
 
 package org.apache.fluo.recipes.core.transaction;
 
-import java.util.AbstractMap;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
+import com.google.common.collect.Iterators;
 import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.client.scanner.RowScannerBuilder;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.recipes.core.types.StringEncoder;
 import org.apache.fluo.recipes.core.types.TypeLayer;
 import org.apache.fluo.recipes.core.types.TypedTransaction;
@@ -154,67 +158,64 @@ public class RecordingTransactionTest {
   }
 
   @Test
-  public void testGetScanNull() {
-    ScannerConfiguration scanConfig = new ScannerConfiguration();
-    expect(tx.get(scanConfig)).andReturn(null);
-    replay(tx);
-    Assert.assertNull(rtx.get(scanConfig));
-    verify(tx);
-  }
-
-  @Test
   public void testGetScanIter() {
-    ScannerConfiguration scanConfig = new ScannerConfiguration();
-    expect(tx.get(scanConfig)).andReturn(new RowIterator() {
-
-      private boolean hasNextRow = true;
-
+    ScannerBuilder sb = mock(ScannerBuilder.class);
+    expect(sb.build()).andReturn(new CellScanner() {
       @Override
-      public boolean hasNext() {
-        return hasNextRow;
-      }
-
-      @Override
-      public Map.Entry<Bytes, ColumnIterator> next() {
-        hasNextRow = false;
-        return new AbstractMap.SimpleEntry<>(Bytes.of("r7"), new ColumnIterator() {
-
-          private boolean hasNextCol = true;
-
-          @Override
-          public boolean hasNext() {
-            return hasNextCol;
-          }
-
-          @Override
-          public Map.Entry<Column, Bytes> next() {
-            hasNextCol = false;
-            return new AbstractMap.SimpleEntry<>(new Column("cf7", "cq7"), Bytes.of("v7"));
-          }
-        });
+      public Iterator<RowColumnValue> iterator() {
+        return Iterators
+            .singletonIterator(new RowColumnValue("r7", new Column("cf7", "cq7"), "v7"));
       }
     });
-    replay(tx);
-    RowIterator rowIter = rtx.get(scanConfig);
-    Assert.assertNotNull(rowIter);
-    Assert.assertTrue(rtx.getTxLog().isEmpty());
-    Assert.assertTrue(rowIter.hasNext());
-    Map.Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
-    Assert.assertFalse(rowIter.hasNext());
-    Assert.assertEquals(Bytes.of("r7"), rowEntry.getKey());
-    ColumnIterator colIter = rowEntry.getValue();
-    Assert.assertTrue(colIter.hasNext());
+
+    expect(tx.scanner()).andReturn(sb);
+
+    replay(tx, sb);
+
+    Iterator<RowColumnValue> iter = rtx.scanner().build().iterator();
     Assert.assertTrue(rtx.getTxLog().isEmpty());
-    Map.Entry<Column, Bytes> colEntry = colIter.next();
+    Assert.assertTrue(iter.hasNext());
+    Assert.assertEquals(new RowColumnValue("r7", new Column("cf7", "cq7"), "v7"), iter.next());
     Assert.assertFalse(rtx.getTxLog().isEmpty());
-    Assert.assertFalse(colIter.hasNext());
-    Assert.assertEquals(new Column("cf7", "cq7"), colEntry.getKey());
-    Assert.assertEquals(Bytes.of("v7"), colEntry.getValue());
     List<LogEntry> entries = rtx.getTxLog().getLogEntries();
     Assert.assertEquals(1, entries.size());
     Assert.assertEquals("LogEntry{op=GET, row=r7, col=cf7 cq7 , value=v7}", entries.get(0)
         .toString());
-    verify(tx);
+
+    verify(tx, sb);
+  }
+
+  @Test
+  public void testGetRowScanner() {
+    ColumnScanner cs = mock(ColumnScanner.class);
+    RowScanner rs = mock(RowScanner.class);
+    RowScannerBuilder rsb = mock(RowScannerBuilder.class);
+    ScannerBuilder sb = mock(ScannerBuilder.class);
+
+    expect(cs.getRow()).andReturn(Bytes.of("r7")).times(2);
+    expect(cs.iterator()).andReturn(
+        Iterators.singletonIterator(new ColumnValue(new Column("cf7", "cq7"), "v7")));
+    expect(rs.iterator()).andReturn(Iterators.singletonIterator(cs));
+    expect(rsb.build()).andReturn(rs);
+    expect(sb.byRow()).andReturn(rsb);
+    expect(tx.scanner()).andReturn(sb);
+
+    replay(tx, sb, rsb, rs, cs);
+
+    Iterator<ColumnScanner> riter = rtx.scanner().byRow().build().iterator();
+    Assert.assertTrue(riter.hasNext());
+    ColumnScanner cscanner = riter.next();
+    Assert.assertEquals(Bytes.of("r7"), cscanner.getRow());
+    Iterator<ColumnValue> citer = cscanner.iterator();
+    Assert.assertTrue(citer.hasNext());
+    Assert.assertTrue(rtx.getTxLog().isEmpty());
+    Assert.assertEquals(new ColumnValue(new Column("cf7", "cq7"), "v7"), citer.next());
+    List<LogEntry> entries = rtx.getTxLog().getLogEntries();
+    Assert.assertEquals(1, entries.size());
+    Assert.assertEquals("LogEntry{op=GET, row=r7, col=cf7 cq7 , value=v7}", entries.get(0)
+        .toString());
+
+    verify(tx, sb, rsb, rs, cs);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
index d31b36c..8d87fdf 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
@@ -22,11 +22,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.core.impl.TxStringUtil;
 
 public class MockSnapshotBase implements SnapshotBase {
@@ -81,7 +80,7 @@ public class MockSnapshotBase implements SnapshotBase {
   }
 
   @Override
-  public RowIterator get(ScannerConfiguration config) {
+  public ScannerBuilder scanner() {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
----------------------------------------------------------------------
diff --git a/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java b/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
index 92d7eaf..ddc51ef 100644
--- a/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
+++ b/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
@@ -34,12 +34,9 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,24 +73,16 @@ public class FluoITHelper {
    */
   public static void printFluoTable(FluoClient client) {
     try (Snapshot s = client.newSnapshot()) {
-      RowIterator iter = s.get(new ScannerConfiguration());
-
       System.out.println("== fluo start ==");
-      while (iter.hasNext()) {
-        Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next();
-        ColumnIterator citer = rowEntry.getValue();
-        while (citer.hasNext()) {
-          Map.Entry<Column, Bytes> colEntry = citer.next();
-
-          StringBuilder sb = new StringBuilder();
-          Hex.encNonAscii(sb, rowEntry.getKey());
-          sb.append(" ");
-          Hex.encNonAscii(sb, colEntry.getKey(), " ");
-          sb.append("\t");
-          Hex.encNonAscii(sb, colEntry.getValue());
-
-          System.out.println(sb.toString());
-        }
+      for (RowColumnValue rcv : s.scanner().build()) {
+        StringBuilder sb = new StringBuilder();
+        Hex.encNonAscii(sb, rcv.getRow());
+        sb.append(" ");
+        Hex.encNonAscii(sb, rcv.getColumn(), " ");
+        sb.append("\t");
+        Hex.encNonAscii(sb, rcv.getValue());
+
+        System.out.println(sb.toString());
       }
       System.out.println("=== fluo end ===");
     }
@@ -120,37 +109,29 @@ public class FluoITHelper {
     expected = sort(expected);
 
     try (Snapshot s = client.newSnapshot()) {
-      RowIterator rowIter = s.get(new ScannerConfiguration());
+      Iterator<RowColumnValue> fluoIter = s.scanner().build().iterator();
       Iterator<RowColumnValue> rcvIter = expected.iterator();
 
-      while (rowIter.hasNext()) {
-        Map.Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
-        ColumnIterator citer = rowEntry.getValue();
-        while (citer.hasNext() && rcvIter.hasNext()) {
-          Map.Entry<Column, Bytes> colEntry = citer.next();
-          RowColumnValue rcv = rcvIter.next();
-          Column col = colEntry.getKey();
-
-          boolean retval = diff("fluo row", rcv.getRow(), rowEntry.getKey());
-          retval |= diff("fluo fam", rcv.getColumn().getFamily(), col.getFamily());
-          retval |= diff("fluo qual", rcv.getColumn().getQualifier(), col.getQualifier());
-          retval |= diff("fluo val", rcv.getValue(), colEntry.getValue());
-
-          if (retval) {
-            log.error("Difference found - row {} cf {} cq {} val {}", rcv.getRow().toString(),
rcv
-                .getColumn().getFamily().toString(), rcv.getColumn().getQualifier().toString(),
rcv
-                .getValue().toString());
-            return false;
-          }
-
-          log.debug("Verified {}", Hex.encNonAscii(rcv, " "));
-        }
-        if (citer.hasNext()) {
-          log.error("An column iterator still has more data");
+      while (fluoIter.hasNext() && rcvIter.hasNext()) {
+        RowColumnValue actualRcv = fluoIter.next();
+        RowColumnValue rcv = rcvIter.next();
+
+        boolean retval = diff("fluo row", rcv.getRow(), actualRcv.getRow());
+        retval |= diff("fluo fam", rcv.getColumn().getFamily(), actualRcv.getColumn().getFamily());
+        retval |=
+            diff("fluo qual", rcv.getColumn().getQualifier(), actualRcv.getColumn().getQualifier());
+        retval |= diff("fluo val", rcv.getValue(), actualRcv.getValue());
+
+        if (retval) {
+          log.error("Difference found - row {} cf {} cq {} val {}", rcv.getsRow(), rcv.getColumn()
+              .getsFamily(), rcv.getColumn().getsQualifier(), rcv.getsValue());
           return false;
         }
+
+        log.debug("Verified {}", Hex.encNonAscii(rcv, " "));
       }
-      if (rowIter.hasNext() || rcvIter.hasNext()) {
+
+      if (fluoIter.hasNext() || rcvIter.hasNext()) {
         log.error("An iterator still has more data");
         return false;
       }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6c3561d..abdc1fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
   <properties>
     <accumulo.version>1.7.1</accumulo.version>
     <findbugs.maxRank>13</findbugs.maxRank>
-    <fluo.version>1.0.0-beta-3-SNAPSHOT</fluo.version>
+    <fluo.version>1.0.0-incubating-SNAPSHOT</fluo.version>
     <hadoop.version>2.6.3</hadoop.version>
     <maven.compiler.source>1.8</maven.compiler.source>
     <maven.compiler.target>1.8</maven.compiler.target>



Mime
View raw message