fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/2] incubator-fluo-recipes git commit: Fixed bug with continuance introed in #71
Date Mon, 18 Jul 2016 18:00:48 GMT
Repository: incubator-fluo-recipes
Updated Branches:
  refs/heads/master 22354d0f7 -> b4c9cdce8


Fixed bug with continuance introed in #71


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/b4c9cdce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/b4c9cdce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/b4c9cdce

Branch: refs/heads/master
Commit: b4c9cdce85be5c3d8932aa11086d90fd0e864055
Parents: 18933f2
Author: Keith Turner <kturner@apache.org>
Authored: Fri Jul 15 16:43:50 2016 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Fri Jul 15 18:59:15 2016 -0400

----------------------------------------------------------------------
 .../fluo/recipes/core/export/ExportObserver.java      |  7 +++++--
 .../fluo/recipes/core/map/CollisionFreeMap.java       | 14 ++++++++++----
 .../org/apache/fluo/recipes/core/map/BigUpdateIT.java |  1 -
 3 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/b4c9cdce/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
index e09a376..2c41313 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
@@ -125,10 +125,13 @@ public class ExportObserver<K, V> extends AbstractObserver {
 
     exporter.processExports(exportIterator);
 
-    if (input.hasNext()) {
-      // not everything was processed so notify self
+    if (input.hasNext() || continueRow != null) {
+      // not everything was processed so notify self OR new data may have been inserted above
the
+      // continue row
       bucket.notifyExportObserver();
+    }
 
+    if (input.hasNext()) {
       if (!memLimitIter.hasNext()) {
         // stopped because of mem limit... set continue key
         bucket.setContinueRow(input.next());

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/b4c9cdce/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
index c6c0918..ccb250f 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
@@ -125,8 +125,6 @@ public class CollisionFreeMap<K, V> {
       span = Span.prefix(ntfyRow);
     }
 
-    // TODO
-    span = Span.prefix(ntfyRow);
     Iterator<RowColumnValue> iter = tx.scanner().over(span).fetch(UPDATE_COL).build().iterator();
 
     Map<Bytes, List<Bytes>> updates = new HashMap<>();
@@ -134,6 +132,7 @@ public class CollisionFreeMap<K, V> {
     long approxMemUsed = 0;
 
     Bytes partiallyReadKey = null;
+    boolean setNextKey = false;
 
     if (iter.hasNext()) {
       Bytes lastKey = null;
@@ -178,8 +177,7 @@ public class CollisionFreeMap<K, V> {
           tx.set(ntfyRow, NEXT_COL, nextPossible);
         }
 
-        // may not read all data because of mem limit, so notify self
-        tx.setWeakNotification(ntfyRow, col);
+        setNextKey = true;
       } else if (nextKey != null) {
         // clear nextKey
         tx.delete(ntfyRow, NEXT_COL);
@@ -188,6 +186,14 @@ public class CollisionFreeMap<K, V> {
       tx.delete(ntfyRow, NEXT_COL);
     }
 
+    if (nextKey != null || setNextKey) {
+      // If not all data was read need to run again in the future. If scanning was started
in the
+      // middle of the bucket, its possible there is new data before nextKey that still needs
to be
+      // processed. If scanning stopped before reading the entire bucket there may be data
after the
+      // stop point.
+      tx.setWeakNotification(ntfyRow, col);
+    }
+
     byte[] dataPrefix = ntfyRow.toArray();
     // TODO this is awful... no sanity check... hard to read
     dataPrefix[Bytes.of(mapId).length() + 1] = 'd';

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/b4c9cdce/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
index 852d117..66056c6 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
@@ -177,7 +177,6 @@ public class BigUpdateIT {
   }
 
   private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) {
-
     RowScanner rows = snap.scanner().over(Span.prefix("side:")).byRow().build();
 
     int row = 0;


Mime
View raw message