fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject incubator-fluo-recipes git commit: Updated for new way of registering observers
Date Tue, 09 May 2017 15:42:55 GMT
Repository: incubator-fluo-recipes
Updated Branches:
  refs/heads/master 70a4239c8 -> d4cc0e343


Updated for new way of registering observers


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/d4cc0e34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/d4cc0e34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/d4cc0e34

Branch: refs/heads/master
Commit: d4cc0e343e24b2aa46f2bb04b476546f95d3a2bb
Parents: 70a4239
Author: Keith Turner <kturner@apache.org>
Authored: Fri May 5 18:51:01 2017 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Mon May 8 18:20:59 2017 -0400

----------------------------------------------------------------------
 docs/accumulo-export-queue.md                   |  2 +-
 docs/combine-queue.md                           |  2 +-
 docs/export-queue.md                            |  2 +-
 .../recipes/core/combine/CombineQueueImpl.java  |  6 +++--
 .../fluo/recipes/core/export/ExportQueue.java   |  3 ++-
 .../fluo/recipes/core/map/CollisionFreeMap.java | 24 ++++++++++++++++----
 .../recipes/core/export/it/ExportTestBase.java  |  3 ++-
 7 files changed, 30 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/d4cc0e34/docs/accumulo-export-queue.md
----------------------------------------------------------------------
diff --git a/docs/accumulo-export-queue.md b/docs/accumulo-export-queue.md
index b037b52..f9de6c6 100644
--- a/docs/accumulo-export-queue.md
+++ b/docs/accumulo-export-queue.md
@@ -93,7 +93,7 @@ Exporting to Accumulo is easy. Follow the steps below:
             new AccumuloExporter<>(EXPORT_QID, appCfg, new SimpleTranslator()));
 
         // An example observer created using a lambda that adds to the export queue.
-        obsRegistry.register(OBS_COL, WEAK, (tx,row,col) -> {
+        obsRegistry.forColumn(OBS_COL, WEAK).useObserver((tx,row,col) -> {
           // Read some data and do some work
 
           // Add results to export queue

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/d4cc0e34/docs/combine-queue.md
----------------------------------------------------------------------
diff --git a/docs/combine-queue.md b/docs/combine-queue.md
index b3c1b13..0cce71b 100644
--- a/docs/combine-queue.md
+++ b/docs/combine-queue.md
@@ -176,7 +176,7 @@ public class WcObserverProvider implements ObserverProvider {
     CombineQueue<String, Long> wcMap = CombineQueue.getInstance(ID, ctx.getAppConfiguration());
 
     // Register observer that updates the Combine Queue
-    obsRegistry.register(DocumentObserver.NEW_COL, STRONG, new DocumentObserver(wcMap));
+    obsRegistry.forColumn(DocumentObserver.NEW_COL, STRONG).useObserver(new DocumentObserver(wcMap));
 
     // Used to join new and existing values for a key. The lambda sums all values and returns
     // Optional.empty() when the sum is zero. Returning Optional.empty() causes the key/value
to be

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/d4cc0e34/docs/export-queue.md
----------------------------------------------------------------------
diff --git a/docs/export-queue.md b/docs/export-queue.md
index 11e8d17..ec4b890 100644
--- a/docs/export-queue.md
+++ b/docs/export-queue.md
@@ -152,7 +152,7 @@ public class FluoApp {
           ExportQueue.getInstance(EQ_ID, ctx.getAppConfiguration());
 
       // register observer that will queue data to export
-      obsRegistry.register(UPDATE_COL, STRONG, new MyObserver(expQ));
+      obsRegistry.forColumn(UPDATE_COL, STRONG).useObserver(new MyObserver(expQ));
 
       // register observer that will export queued data
       expQ.registerObserver(obsRegistry, new CountExporter());

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/d4cc0e34/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java
b/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java
index bf97a5e..48d2651 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java
@@ -53,6 +53,7 @@ class CombineQueueImpl<K, V> implements CombineQueue<K, V> {
   private Bytes dataPrefix;
   private Column notifyColumn;
 
+  private final String cqId;
   private final Class<K> keyType;
   private final Class<V> valType;
   private final int numBuckets;
@@ -61,6 +62,7 @@ class CombineQueueImpl<K, V> implements CombineQueue<K, V> {
 
   @SuppressWarnings("unchecked")
   CombineQueueImpl(String cqId, SimpleConfiguration appConfig) throws Exception {
+    this.cqId = cqId;
     this.updatePrefix = Bytes.of(cqId + ":u:");
     this.dataPrefix = Bytes.of(cqId + ":d:");
     this.notifyColumn = new Column("fluoRecipes", "cfm:" + cqId);
@@ -314,7 +316,7 @@ class CombineQueueImpl<K, V> implements CombineQueue<K, V>
{
   @Override
   public void registerObserver(Registry obsRegistry, Combiner<K, V> combiner,
       ChangeObserver<K, V> changeObserver) {
-    obsRegistry.register(notifyColumn, NotificationType.WEAK,
-        (tx, row, col) -> process(tx, row, col, combiner, changeObserver));
+    obsRegistry.forColumn(notifyColumn, NotificationType.WEAK).withId("combineq-" + cqId)
+        .useObserver((tx, row, col) -> process(tx, row, col, combiner, changeObserver));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/d4cc0e34/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
index e9618be..264396f 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
@@ -257,7 +257,8 @@ public class ExportQueue<K, V> {
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    obsRegistry.register(ExportBucket.newNotificationColumn(queueId), NotificationType.WEAK,
obs);
+    obsRegistry.forColumn(ExportBucket.newNotificationColumn(queueId), NotificationType.WEAK)
+        .withId("exportq-" + queueId).useObserver(obs);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/d4cc0e34/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
index 4c7f86b..2293de7 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
@@ -72,14 +72,28 @@ public class CollisionFreeMap<K, V> {
 
     Observer observer;
 
-    @Override
-    public void register(Column observedColumn, NotificationType ntfyType, Observer observer)
{
-      this.observer = observer;
+    private class Registry implements ObserverProvider.Registry.ObserverArgument,
+        ObserverProvider.Registry.IdentityOption {
+
+      @Override
+      public ObserverArgument withId(String alias) {
+        return this;
+      }
+
+      @Override
+      public void useObserver(Observer obs) {
+        observer = obs;
+      }
+
+      @Override
+      public void useStrObserver(StringObserver obs) {
+        observer = obs;
+      }
     }
 
     @Override
-    public void registers(Column observedColumn, NotificationType ntfyType, StringObserver
observer) {
-      this.observer = observer;
+    public IdentityOption forColumn(Column observedColumn, NotificationType ntfyType) {
+      return new Registry();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/d4cc0e34/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
index 48050ed..8625c59 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
@@ -144,7 +144,8 @@ public class ExportTestBase {
       ExportQueue<String, RefUpdates> refExportQueue =
           ExportQueue.getInstance(RefExporter.QUEUE_ID, ctx.getAppConfiguration());
 
-      or.register(new Column("content", "new"), STRONG, new DocumentObserver(refExportQueue));
+      or.forColumn(new Column("content", "new"), STRONG).useObserver(
+          new DocumentObserver(refExportQueue));
       refExportQueue.registerObserver(or, new RefExporter());
     }
   }


Mime
View raw message