fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [incubator-fluo-website] branch gh-pages updated: fixes #64 updated Tour for new Observer API (#67)
Date Thu, 29 Jun 2017 17:58:12 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch gh-pages
in repository https://gitbox.apache.org/repos/asf/incubator-fluo-website.git


The following commit(s) were added to refs/heads/gh-pages by this push:
     new 9af597b  fixes #64 updated Tour for new Observer API (#67)
9af597b is described below

commit 9af597b4dce9964c4328d0ea65ab83b2cee7bff4
Author: Keith Turner <keith@deenlo.com>
AuthorDate: Thu Jun 29 13:58:10 2017 -0400

    fixes #64 updated Tour for new Observer API (#67)
---
 tour/application-configuration.md | 118 +++++++++---------------
 tour/basic-read-write.md          |   9 +-
 tour/collision-code.md            |   2 +-
 tour/exercise-1.md                | 189 +++++++++++++++-----------------------
 tour/mem-self-ntfy-code.md        |  39 ++++----
 tour/observer_example.md          |  36 ++++----
 tour/observers.md                 |  13 ++-
 tour/snapshot-isolation.md        |   2 +-
 tour/weak-code.md                 |  44 +++++----
 tour/write-skew.md                |   5 +-
 10 files changed, 192 insertions(+), 265 deletions(-)

diff --git a/tour/application-configuration.md b/tour/application-configuration.md
index 155ec8e..5de8503 100644
--- a/tour/application-configuration.md
+++ b/tour/application-configuration.md
@@ -14,7 +14,7 @@ To use application configuration, set properties with the prefix `fluo.app`
in y
 file before initialization.  Alternatively use [FluoConfiguration.getAppConfiguration()][fcogac]
to
 set these properties programmatically.  After Fluo is initialized this information can be
accessed
 anywhere by calling [FluoClient.getAppConfiguration()][fclgac],
-[Observer.Context.getAppConfigurtaion()][ocgac], or [Loader.Context.getAppConfiguration()][lcgac].
+[ObserverProvider.Context.getAppConfigurtaion()][opgac], or [Loader.Context.getAppConfiguration()][lcgac].
 
 The following is a simple example of using application config.   This example sets some application
 config before initialization.  After initialization the configuration is accessed via
@@ -46,79 +46,51 @@ table5
 
 ## Observer Configuration
 
-If you want instances of an Observer to behave differently and share code, one way to accomplish
-this is with per observer configuration.  When setting up an observer call one of the
-[ObserverSpecification][ospec] methods that takes configuration.  When an observer is initialized
it
-can access this configuration by calling [Observer.Context.getObserverConfiguration()][ocgp].
-
-The code below shows an example of setting configuration for an Observer.  This example simulates
an
-observer that can export rows to a mysql table. The example configures two instances of an
observer
-using the same class with different configuration.  Even though the observers use the same
class, the
-two instances must observe different columns.  That is why the code derives the observed
column based
-on the observer configuration.  Notice the mysql database is obtained from application configuration
by
-the observer and the table is obtained from observer configuration.
+If you want to use the same code to create multiple observers, one way to accomplish this
is
+with application configuration. The code below shows an example of this.  The example simulates
+exporting rows to multiple mysql tables.  To do this, it creates an observers per a export
+table. The observed column and export table for each observer is derived from application
+configuration.
 
 ```java
-package ft;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.Observer;
-
-public class MysqlExportObserver implements Observer {
-
-  private String exportDB;
-  private String exportTable;
-
-  @Override
-  public void close() {}
-
-  @Override
-  public ObservedColumn getObservedColumn() {
-    Column col = new Column("ET", exportTable);
-    return new ObservedColumn(col, NotificationType.WEAK);
-  }
-
-  @Override
-  public void init(Context ctx) throws Exception {
-    exportDB = ctx.getAppConfiguration().getString("exportDB");
-    exportTable = ctx.getObserverConfiguration().getString("exportTable");
-  }
-
-  @Override
-  public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-    CellScanner scanner = tx.scanner().over(Span.exact(row)).build();
-
-    for (RowColumnValue rcv : scanner) {
-      System.out.printf("Exporting val:%s from row:%s to db/table:%s/%s\n", rcv.getsValue(),
row,
-          exportDB, exportTable);
-      tx.delete(rcv.getRow(), rcv.getColumn());
+  public static class TourObserverProvider implements ObserverProvider {
+    @Override
+    public void provide(Registry obsRegistry, Context ctx) {
+      SimpleConfiguration appCfg = ctx.getAppConfiguration();
+      String exportDB = appCfg.getString("exportDB");
+
+      // Create an observer for each export table
+      for (Entry<String, String> entry : appCfg.subset("exportTables").toMap().entrySet())
{
+        String exportId = entry.getKey();
+        String exportTable = entry.getValue();
+
+        Column exportNtfyCol = new Column("ET", exportId);
+
+        Observer exportObserver = (tx, row, col) -> {
+          CellScanner scanner = tx.scanner().over(Span.exact(row)).build();
+
+          for (RowColumnValue rcv : scanner) {
+            System.out.printf("Exporting val=%s from row=%s to db=%s table=%s\n", rcv.getsValue(),
+                row, exportDB, exportTable);
+            tx.delete(rcv.getRow(), rcv.getColumn());
+          }
+        };
+
+        obsRegistry.forColumn(exportNtfyCol, NotificationType.WEAK).useObserver(exportObserver);
+      }
     }
   }
-}
-```
 
-The following code initializes two observers using the same class with different configuration.
 It
-also sets application configuration that is used by the observers.  The code then writes
some data
-and notifies the two observers which process the data.
-
-```java
   private static void preInit(FluoConfiguration fluoConfig) {
     SimpleConfiguration appConfig = fluoConfig.getAppConfiguration();
     appConfig.setProperty("exportDB", "db1");
 
-    ObserverSpecification observer1 = new ObserverSpecification(MysqlExportObserver.class.getName(),
-        Collections.singletonMap("exportTable", "table9"));
-
-    ObserverSpecification observer2 = new ObserverSpecification(MysqlExportObserver.class.getName(),
-        Collections.singletonMap("exportTable", "table3"));
+    // An observer will be created to process each export table. In this example 't1' and
't2'
+    // are used as logical IDs for export tables.
+    appConfig.setProperty("exportTables.t1", "bigtable");
+    appConfig.setProperty("exportTables.t2", "tinytable");
 
-    fluoConfig.addObserver(observer1);
-    fluoConfig.addObserver(observer2);
+    fluoConfig.setObserverProvider(TourObserverProvider.class);
   }
 
   private static void exercise(MiniFluo mini, FluoClient client) {
@@ -127,7 +99,7 @@ and notifies the two observers which process the data.
       tx.set("e:99", new Column("export", "data2"), "444");
       tx.set("e:99", new Column("export", "data3"), "555");
 
-      tx.setWeakNotification("e:99", new Column("ET", "table3"));
+      tx.setWeakNotification("e:99", new Column("ET", "t1"));
 
       tx.commit();
     }
@@ -138,7 +110,7 @@ and notifies the two observers which process the data.
       tx.set("e:98", new Column("export", "data2"), "888");
       tx.set("e:98", new Column("export", "data3"), "999");
 
-      tx.setWeakNotification("e:98", new Column("ET", "table9"));
+      tx.setWeakNotification("e:98", new Column("ET", "t2"));
 
       tx.commit();
     }
@@ -150,17 +122,17 @@ and notifies the two observers which process the data.
 Running the code above prints the following.
 
 ```
-Exporting val:222 from row:e:99 to db/table:db1/table3
-Exporting val:777 from row:e:98 to db/table:db1/table9
-Exporting val:444 from row:e:99 to db/table:db1/table3
-Exporting val:888 from row:e:98 to db/table:db1/table9
-Exporting val:555 from row:e:99 to db/table:db1/table3
-Exporting val:999 from row:e:98 to db/table:db1/table9
+Exporting val=777 from row=e:98 to db=db1 table=tinytable
+Exporting val=888 from row=e:98 to db=db1 table=tinytable
+Exporting val=999 from row=e:98 to db=db1 table=tinytable
+Exporting val=222 from row=e:99 to db=db1 table=bigtable
+Exporting val=444 from row=e:99 to db=db1 table=bigtable
+Exporting val=555 from row=e:99 to db=db1 table=bigtable
 ```
 
 [fcogac]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/config/FluoConfiguration.html#getAppConfiguration--
 [fclgac]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/client/FluoClient.html#getAppConfiguration--
-[ocgac]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/observer/Observer.Context.html#getAppConfiguration--
+[opgac]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/observer/ObserverProvider.Context.html#getAppConfiguration--
 [lcgac]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/client/Loader.Context.html#getAppConfiguration--
 [ospec]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release}}/org/apache/fluo/api/config/ObserverSpecification.html
 [ocgp]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/observer/Observer.Context.html#getObserverConfiguration--
diff --git a/tour/basic-read-write.md b/tour/basic-read-write.md
index d1ffe49..be6e751 100644
--- a/tour/basic-read-write.md
+++ b/tour/basic-read-write.md
@@ -2,10 +2,9 @@
 title: Read and Write Data
 ---
 
-The following example shows basic code for writing data using Fluo and then reading it back
out.  To
-run this code, modify `src/main/java/ft/Main.java` in the [Fluo Tour git repository][1] and
run it by
-following the instructions in the repository.
-
+The following  shows Java code for writing and reading data using Fluo.  In your local clone,
modify
+the `exercise(MiniFluo, FluoClient)` function in [src/main/java/ft/Main.java][main].  Then
run
+`ft.Main` as previously mentioned.
 
 ```java
   private static void exercises(MiniFluo mini, FluoClient client) {
@@ -39,7 +38,7 @@ assume UTF-8 encoding when converting to bytes.
 Transactions and snapshots allocate resources and therefore have `close()` methods.  The
 try-with-resources block will automatically call `close()`, even when exceptions occur.
 
-[1]: https://github.com/apache/incubator-fluo-website/tree/fluo-tour
+[main]: https://github.com/apache/incubator-fluo-website/tree/fluo-tour/src/main/java/ft/Main.java
 [get]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/client/SnapshotBase.html#get-org.apache.fluo.api.data.Bytes-org.apache.fluo.api.data.Column-
 [gets]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/client/SnapshotBase.html#gets-java.lang.CharSequence-org.apache.fluo.api.data.Column-
 [Bytes]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/data/Bytes.html
diff --git a/tour/collision-code.md b/tour/collision-code.md
index 8eeef7e..0f869d3 100644
--- a/tour/collision-code.md
+++ b/tour/collision-code.md
@@ -35,7 +35,7 @@ title: Collision code
 The code above prints :
 
 ```
-tx3 commit exception message : org.apache.fluo.api.exceptions.CommitException
+tx3 commit exception : org.apache.fluo.api.exceptions.CommitException: Collsions(1):kerbalnaut0001
name last
 Kerman
 ```
 
diff --git a/tour/exercise-1.md b/tour/exercise-1.md
index c9a99d3..f1652ba 100644
--- a/tour/exercise-1.md
+++ b/tour/exercise-1.md
@@ -220,9 +220,9 @@ import java.util.*;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.*;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.StringObserver;
 
-public class ContentObserver extends AbstractObserver {
+public class ContentObserver implements StringObserver {
 
   public static final Column PROCESSED_COL = new Column("doc", "processed");
   public static final Column WORD_COUNT = new Column("word","docCount");
@@ -247,9 +247,7 @@ public class ContentObserver extends AbstractObserver {
 
 
   @Override
-  public void process(TransactionBase tx, Bytes brow, Column col) throws Exception {
-
-    String row = brow.toString();
+  public void process(TransactionBase tx, String row, Column col) throws Exception {
 
     Map<Column, String> colVals =
         tx.gets(row, DocLoader.CONTENT_COL, DocLoader.REF_STATUS_COL, PROCESSED_COL);
@@ -258,18 +256,12 @@ public class ContentObserver extends AbstractObserver {
     String status = colVals.get(DocLoader.REF_STATUS_COL);
     String processed = colVals.getOrDefault(PROCESSED_COL, "false");
 
-    // TODO if status is referenced and not already processed the adjustCounts by +1 and
set
+    // TODO if status is referenced and not already processed then adjustCounts by +1 and
set
     // PROCESSED_COL to true
 
-    // TODO is status is unreferenced then delete all columns for content
+    // TODO if status is unreferenced then delete all columns for content
     // TODO if status is unreferenced and document was processed, then adjust counts by -1
   }
-
-
-  @Override
-  public ObservedColumn getObservedColumn() {
-    return new ObservedColumn(DocLoader.REF_STATUS_COL, NotificationType.STRONG);
-  }
 }
 ```
 
@@ -280,8 +272,16 @@ When you are ready to run the observer, modify the `preInit()` method
in `ft.Mai
 observer as follows.
 
 ```java
+  public static class TourObserverProvider implements ObserverProvider {
+    @Override
+    public void provide(Registry obsRegistry, Context ctx) {
+      obsRegistry.forColumn(DocLoader.REF_STATUS_COL, NotificationType.STRONG)
+          .useObserver(new ContentObserver());;
+    }
+  }
+
   private static void preInit(FluoConfiguration fluoConfig) {
-    fluoConfig.addObserver(new ObserverSpecification(ContentObserver.class.getName()));
+    fluoConfig.setObserverProvider(TourObserverProvider.class);
   }
 ```
 
@@ -393,111 +393,50 @@ w:stuck word docCount	1
 
 ## Part 3 : Using Fluo Recipes
 
-The way to compute word counts above is very prone to transactional collisions. One way to
avoid
-these collisions is to use the CollisionFreeMap(CFM) provided in Fluo Recipes.  The CFM will
queue
+The suggested method of computing word counts above is prone to transaction collisions. One
way to avoid
+collisions is to use a CombineQueue provided by Fluo Recipes.  This will queue
 updates for words and notify another observer to process the queued updates.  The updates
are queued
-in a way that will not cause collisions.  The CFM has its own Observer which will call two
functions
-you provide.  The code below shows an example of these two functions and how to configure
the CFM to
-call them.
+in a way that will not cause collisions.  The CombineQueue has its own Observer which will
call two functions
+you provide.  One function combines updates for a key and the other consumes changes to values.
 
-To try using a CFM, first add the following class.
+To try using a CombineQueue, first add the following class.  This class handles changes to
word
+counts by updating an inverted index of word counts.
 
 ```java
 package ft;
 
-import java.util.*;
-
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.recipes.core.map.CollisionFreeMap;
-import org.apache.fluo.recipes.core.map.Combiner;
-import org.apache.fluo.recipes.core.map.Update;
-import org.apache.fluo.recipes.core.map.UpdateObserver;
+import org.apache.fluo.recipes.core.combine.ChangeObserver;
 
 import static ft.ContentObserver.WORD_COUNT;
 
-/**
- * This class contains all of the code related to the {@link CollisionFreeMap} that keeps
track of
- * word counts.  It also generates an inverted index of word counts as an example follow
on action.
- */
-public class WordCounter {
-
-  /**
-   * the {@link CollisionFreeMap} Observer calls this combiner to processes the queued updates
for
-   * a word.
-   */
-  public static class LongCombiner implements Combiner<String, Long>{
-
-    @Override
-    public Optional<Long> combine(String k, Iterator<Long> counts) {
-      long sum = 0;
-      while(counts.hasNext()) {
-        sum += counts.next();
-      }
+public class WordCountChangeObserver implements ChangeObserver<String, Long> {
 
-      if(sum == 0) {
-        //returning emtpy will cause the CFM to delete the key in Fluo's table
-        return Optional.empty();
-      } else {
-        return Optional.of(sum);
-      }
-    }
-  }
-
-  /**
-   * The {@link CollisionFreeMap} Observer will call this class when the counts for a word
change.
-   */
-  public static class WordObserver extends UpdateObserver<String, Long> {
-    @Override
-    public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>>
updates) {
-      System.out.println("== begin CFM updates ==");  //this print to show per bucket processing
-      while(updates.hasNext()) {
-        Update<String, Long> u = updates.next();
-
-        long oldCount = u.getOldValue().orElse(0l);
-        long newCount = u.getNewValue().orElse(0l);
+  @Override
+  public void process(TransactionBase tx, Iterable<Change<String, Long>> changes)
{
+    // This print shows per bucket processing.
+    System.out.println("== begin processing word count changes ==");
 
-        //create an inverted index of word counts
-        if(u.getOldValue().isPresent()) {
-          tx.delete(String.format("ic:%06d:%s", oldCount, u.getKey()), WORD_COUNT);
-        }
+    for (Change<String, Long> change : changes) {
 
-        if(u.getNewValue().isPresent()) {
-          tx.set(String.format("ic:%06d:%s", newCount, u.getKey()), WORD_COUNT, "");
-        }
+      long oldCount = change.getOldValue().orElse(0l);  //previous count for a word
+      long newCount = change.getNewValue().orElse(0l);  //new count for a word
 
-        System.out.printf("  update %s %d -> %d\n", u.getKey(), oldCount , newCount);
+      if (change.getOldValue().isPresent()) {
+        // delete old count for word from inverted index
+        tx.delete(String.format("ic:%06d:%s", oldCount, change.getKey()), WORD_COUNT);
       }
-      System.out.println("== end CFM updates ==");
-    }
-  }
 
-  /**
-   * Code to setup a CFM's Observer and configure it to call your functions.
-   */
-  public static void configure(FluoConfiguration fluoConfig, int numBuckets) {
-    CollisionFreeMap.configure(fluoConfig, new CollisionFreeMap.Options("wc", LongCombiner.class,
-        WordObserver.class, String.class, Long.class, 3));
-  }
-
-  private CollisionFreeMap<String, Long> cfm;
-
-  WordCounter(SimpleConfiguration appConfig){
-    cfm = CollisionFreeMap.getInstance("wc", appConfig);
-  }
-
-  /**
-   * This method will queue updates for each word to be processed later by the CFM Observer.
-   */
-  void adjustCounts(TransactionBase tx, int delta, List<String> words){
-    HashMap<String, Long> wcUpdates = new HashMap<>();
+      if (change.getNewValue().isPresent()) {
+        // insert new count for word into inverted index
+        tx.set(String.format("ic:%06d:%s", newCount, change.getKey()), WORD_COUNT, "");
+      }
 
-    for (String word : words) {
-      wcUpdates.put(word, (long)delta);
+      // change.getKey() is a word
+      System.out.printf("  update %s %d -> %d\n", change.getKey(), oldCount, newCount);
     }
 
-    cfm.update(tx, wcUpdates);
+    System.out.println("== end processing word count changes ==");
   }
 }
 ```
@@ -505,41 +444,59 @@ public class WordCounter {
 Then modify `preInit()` in `Main` to the following.
 
 ```java
+  public static class TourObserverProvider implements ObserverProvider {
+    @Override
+    public void provide(Registry obsRegistry, Context ctx) {
+      CombineQueue<String, Long> wordCQ = CombineQueue.getInstance("wc", ctx.getAppConfiguration());
+
+      // Pass the word count combine queue to the ContentObserver so it can queue updates

+      // when a documents word counts change.
+      obsRegistry.forColumn(DocLoader.REF_STATUS_COL, NotificationType.STRONG)
+          .useObserver(new ContentObserver(wordCQ));
+
+      // Register observer to process queued updates, the observer will call the two functions.
+      // SummingCombiner is provided by Fluo Recipes.  It sums all updates queued for a key.
+      wordCQ.registerObserver(obsRegistry, new SummingCombiner<>(), new WordCountChangeObserver());
+    }
+  }
+
   private static void preInit(FluoConfiguration fluoConfig) {
-    fluoConfig.addObserver(new ObserverSpecification(ContentObserver.class.getName()));
-    WordCounter.configure(fluoConfig, 3);
+    fluoConfig.setObserverProvider(TourObserverProvider.class);
+    CombineQueue.configure("wc").keyType(String.class).valueType(Long.class).buckets(3)
+        .save(fluoConfig);
   }
 ```
 
-After that add the following `init()` method to `ContentObserver` and modify `adjustCounts()`
to the
+Add a constuctor to `ContentObserver` and modify `adjustCounts()` to the
 following.
 
 ```java
-  private WordCounter wordCounter;
 
-  @Override
-  public void init(Context context) throws Exception {
-    //get an instance of the CFM based on application config
-    wordCounter = new WordCounter(context.getAppConfiguration());
+  private CombineQueue<String, Long> wordCQ;
+
+  public ContentObserver(CombineQueue<String, Long> wordCQ) {
+    this.wordCQ = wordCQ;
   }
 
   private void adjustCounts(TransactionBase tx, int delta, List<String> words) {
-    wordCounter.adjustCounts(tx, delta, words);
+    HashMap<String, Long> wcUpdates = new HashMap<>();
+    words.forEach(word -> wcUpdates.put(word, (long) delta));
+    wordCQ.addAll(tx, wcUpdates);
   }
 ```
 
-The CFM groups key values into buckets for efficiency and processes the updates for entire
bucket in
-a single transaction.  When you run this new code, that is why `== begin CFM updates ==`
is seen at
-least three times for each group of documents loaded.
+A CombineQueue  groups key values into buckets for efficiency and processes entire buckets
in
+a single transaction.  When you run this code, that is why `== begin processing word count
changes ==` is seen 
+multiple times for each group of documents loaded.
 
-When you run this example you will also notice two new prefixes in the output of the table
scan.  First
-the `wc:` prefix is where the CFM stores its data.  By default the CFM uses Kryo for serialization
+These changes produce two new prefixes in the output of the table scan.  First
+the `wc:` prefix is where the CombineQueue stores its data.  By default the CombineQueue
uses Kryo for serialization
 and therefore the key values with this prefix contain non-ASCII characters.  The utility
function
 `FluoITHelper.printFluoTable()` escapes non-ASCII characters with `\x<HEX>`.   Second
the `ic:`
-prefix contains an inverted index of word counts.  This was created simply to show an example
of a
+prefix contains the inverted index of word counts.  This was created simply to show an example
of a
 follow on action when word counts change.  Ideally this follow on action would have a low
chance of
 collisions.  Creating the inverted index will not cause collisions because each word is in
a single
-CFM bucket and each bucket is processed independently.
+CombineQueue bucket and each bucket is processed independently.
 
 ## Part 4 : Running this example on a real instance.
 
diff --git a/tour/mem-self-ntfy-code.md b/tour/mem-self-ntfy-code.md
index deea13d..94e2579 100644
--- a/tour/mem-self-ntfy-code.md
+++ b/tour/mem-self-ntfy-code.md
@@ -8,42 +8,33 @@ title: Memory limits and self notify code
   static Column UPDATE_COL = new Column("sum", "update");
   static Column CONTINUE_COL = new Column("sum", "continue");
 
-  public static class SummingObserver extends AbstractObserver {
+  public static class SummingObserver implements StringObserver {
 
     private int maxToProcess;
 
-    @Override
-    public void init(Context context) throws Exception {
-      //made the max amount to process in a single transaction configurable
-      maxToProcess = context.getObserverConfiguration().getInt("maxToProcess", 100);
-    }
-
-    @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(NC, NotificationType.WEAK);
+    SummingObserver(int maxToProcess) {
+      this.maxToProcess = maxToProcess;
     }
 
     @Override
-    public void process(TransactionBase tx, Bytes brow, Column col) throws Exception {
-
-      String row = brow.toString();
+    public void process(TransactionBase tx, String row, Column col) throws Exception {
 
       Map<Column, String> colVals = tx.gets(row, TOTAL_COL, CONTINUE_COL);
 
       int sum = Integer.parseInt(colVals.getOrDefault(TOTAL_COL, "0"));
-      
+
       // construct a scan range that uses the continue row
       String startRow = colVals.getOrDefault(CONTINUE_COL, row + "/");
       String endRow = row + "/:"; // after the character '9' comes ':'
       CellScanner scanner = tx.scanner().over(new Span(startRow, true, endRow, false)).build();
 
       int processed = 0;
-      
+
       for (RowColumnValue rcv : scanner) {
         if (processed >= maxToProcess) {
           // stop processing and set the continue row
           tx.set(row, CONTINUE_COL, rcv.getsRow());
-          tx.setWeakNotification(brow, col);
+          tx.setWeakNotification(row, col);
           break;
         }
         sum += Integer.parseInt(rcv.getsValue());
@@ -60,15 +51,23 @@ title: Memory limits and self notify code
         tx.delete(row, CONTINUE_COL);
         // need to start over at the beginning and see if there is new data before the continue
         // column
-        tx.setWeakNotification(brow, col);
+        tx.setWeakNotification(row, col);
       }
     }
   }
 
+  public static class TourObserverProvider implements ObserverProvider {
+    @Override
+    public void provide(Registry obsRegistry, Context ctx) {
+      int maxToProcess = ctx.getAppConfiguration().getInt("maxToProcess");
+      obsRegistry.forColumn(NC, NotificationType.WEAK)
+          .useObserver(new SummingObserver(maxToProcess));
+    }
+  }
+
   private static void preInit(FluoConfiguration fluoConfig) {
-    ObserverSpecification ospec = new ObserverSpecification(SummingObserver.class.getName());
-    ospec.getConfiguration().setProperty("maxToProcess", 500);
-    fluoConfig.addObserver(ospec);
+    fluoConfig.getAppConfiguration().setProperty("maxToProcess", 500);
+    fluoConfig.setObserverProvider(TourObserverProvider.class);
   }
 
   private static void exercise(MiniFluo mini, FluoClient client) {
diff --git a/tour/observer_example.md b/tour/observer_example.md
index 8493e6e..2b374e8 100644
--- a/tour/observer_example.md
+++ b/tour/observer_example.md
@@ -6,43 +6,47 @@ The following code shows how to setup and trigger an observer.  The observer
is
 column *obs:data* is changed.
 
 ```java
-  public static final Column OBSERVED_COL = new Column("obs","data");
-  public static final Column INVERT_COL = new Column("inv","data");
+  public static final Column OBSERVED_COL = new Column("obs", "data");
+  public static final Column INVERT_COL = new Column("inv", "data");
 
-  public static class MyObserver extends AbstractObserver {
+  // This class is responsible for registering Observers for all observed columns.
+  public static class MyObserverProvider implements ObserverProvider {
 
     @Override
-    public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-      //invert column and value
-      Bytes value = tx.get(row, col);
-      tx.set(value, INVERT_COL, row);
-    }
+    public void provide(Registry obsRegistry, Context ctx) {
 
-    @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(OBSERVED_COL, NotificationType.STRONG);
+      // Observer is a functional interface allowing Observers to be lambdas
+      Observer invObserver = (tx, row, col) -> {
+        Bytes value = tx.get(row, col);
+        tx.set(value, INVERT_COL, row);
+      };
+
+      // Register an observer to process notifications for the column obs:data
+      obsRegistry.forColumn(OBSERVED_COL, NotificationType.STRONG).useObserver(invObserver);
     }
+
   }
 
   private static void preInit(FluoConfiguration fluoConfig) {
-    //configure Fluo to use MyObserver before initialization
-    fluoConfig.addObserver(new ObserverSpecification(MyObserver.class.getName()));
+    // Configure ObserverProvider before initialization. Workers will instantiate this class
and use
+    // it to create Observers.
+    fluoConfig.setObserverProvider(MyObserverProvider.class);
   }
 
   private static void exercise(MiniFluo mini, FluoClient client) {
-    try(Transaction tx1 = client.newTransaction()) {
+    try (Transaction tx1 = client.newTransaction()) {
       tx1.set("kerbalnaut0001", OBSERVED_COL, "Jebediah");
       tx1.commit();
     }
 
-    try(Transaction tx2 = client.newTransaction()) {
+    try (Transaction tx2 = client.newTransaction()) {
       tx2.set("kerbalnaut0002", OBSERVED_COL, "Bill");
       tx2.commit();
     }
 
     mini.waitForObservers();
 
-    try(Snapshot snap = client.newSnapshot()) {
+    try (Snapshot snap = client.newSnapshot()) {
       snap.scanner().build().forEach(System.out::println);
     }
   }
diff --git a/tour/observers.md b/tour/observers.md
index 0f98629..83e2ad1 100644
--- a/tour/observers.md
+++ b/tour/observers.md
@@ -2,14 +2,13 @@
 title: Observer Concepts
 ---
 
-Fluo supports complex processing by running user provided code in Observers. Observers are
triggered
-by notifications. An Observer requests that the system run it when a certain column is modified.
When
-another transaction modifies an observed column, it will persist a notification that later
causes
-the Observer to run.  When an Observer is run, its provided with the row and column that
caused it
-to run along with a transaction. Fluo worker processes running across a cluster will execute
-Observers.
+Fluo supports incremental processing with Observers and Notifications.  Notifications are
persistent
+markers set by a transaction that indicate an Observer should run later for a certain row+column.
+Observers are user provided code that are registered to process notifications for a certain
column. When
+an Observer is run, its provided with the row and column that caused it to run along with
a
+transaction. Fluo worker processes running across a cluster will execute Observers.
 
-Since all transactions need to know which columns trigger observers, observers must be registered
+Since all transactions need to know which columns trigger Observers, Observers must be registered
 with Fluo at initialization time.
 
 Fluo supports two type of notifications :
diff --git a/tour/snapshot-isolation.md b/tour/snapshot-isolation.md
index df100aa..a0eab91 100644
--- a/tour/snapshot-isolation.md
+++ b/tour/snapshot-isolation.md
@@ -5,7 +5,7 @@ title: Snapshot Isolation
 Fluo provides Snapshot isolation.  This means that a Transaction or Snapshot can only see
data
 committed before it started.   The following steps demonstrate the concept of snapshot isolation.
Try
 to code up the steps using Fluo, then run it and see if it prints what you expect.  If there
is
-something you are unsure about, code for the following steps is on the next page.
+something you are unsure about, code for the following steps are on the next page.
 
 
  * **Create transaction** *tx1*
diff --git a/tour/weak-code.md b/tour/weak-code.md
index 1f6ffa6..8752ff2 100644
--- a/tour/weak-code.md
+++ b/tour/weak-code.md
@@ -7,45 +7,43 @@ title: Weak Notification Code
   static Column TOTAL_COL = new Column("sum", "total");
   static Column UPDATE_COL = new Column("sum", "update");
 
-  public static class SummingObserver extends AbstractObserver {
-
+  public static class TourObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(NC, NotificationType.WEAK);
-    }
+    public void provide(Registry obsRegistry, Context ctx) {
 
-    @Override
-    public void process(TransactionBase tx, Bytes brow, Column col) throws Exception {
+      // Observers can be written as lambdas
+      StringObserver summingObs = (tx, row, col) -> {
+        int sum = Integer.parseInt(tx.gets(row, TOTAL_COL, "0"));
 
-      String row = brow.toString();
+        CellScanner scanner = tx.scanner().over(Span.prefix(row + "/")).build();
+        for (RowColumnValue rcv : scanner) {
+          sum += Integer.parseInt(rcv.getsValue());
+          tx.delete(rcv.getRow(), rcv.getColumn());
+        }
 
-      int sum = Integer.parseInt(tx.gets(row, TOTAL_COL, "0"));
-
-      CellScanner scanner = tx.scanner().over(Span.prefix(row +"/")).build();
-      for (RowColumnValue rcv : scanner) {
-        sum += Integer.parseInt(rcv.getsValue());
-        tx.delete(rcv.getRow(), rcv.getColumn());
-      }
+        System.out.println("sum : " + sum);
 
-      System.out.println("sum : " + sum);
+        tx.set(row, TOTAL_COL, "" + sum);
+      };
 
-      tx.set(row, TOTAL_COL, ""+sum);
+      obsRegistry.forColumn(NC, NotificationType.WEAK).useObserver(summingObs);
     }
   }
 
   private static void preInit(FluoConfiguration fluoConfig) {
-    fluoConfig.addObserver(new ObserverSpecification(SummingObserver.class.getName()));
+    fluoConfig.setObserverProvider(TourObserverProvider.class);
   }
 
   private static void exercise(MiniFluo mini, FluoClient client) {
     try (LoaderExecutor le = client.newLoaderExecutor()) {
       Random r = new Random(42);
       for (int i = 0; i < 5000; i++) {
-        //The Loader interface only has one function and can therefore be written as a lambda
below.
+        // The Loader interface only has one function and can therefore be written as a lambda
+        // below.
         le.execute((tx, ctx) -> {
-          String row = "counter001/"+String.format("%07d", r.nextInt(10_000_000));
+          String row = "counter001/" + String.format("%07d", r.nextInt(10_000_000));
           int curVal = Integer.parseInt(tx.gets(row, UPDATE_COL, "0"));
-          tx.set(row, UPDATE_COL, curVal+1+"");
+          tx.set(row, UPDATE_COL, curVal + 1 + "");
           tx.setWeakNotification("counter001", NC);
         });
       }
@@ -53,8 +51,8 @@ title: Weak Notification Code
 
     mini.waitForObservers();
 
-    try(Snapshot snap = client.newSnapshot()) {
-      System.out.println("final sum : "+snap.gets("counter001", TOTAL_COL));
+    try (Snapshot snap = client.newSnapshot()) {
+      System.out.println("final sum : " + snap.gets("counter001", TOTAL_COL));
     }
   }
 ```
diff --git a/tour/write-skew.md b/tour/write-skew.md
index 731961b..7e457f4 100644
--- a/tour/write-skew.md
+++ b/tour/write-skew.md
@@ -4,7 +4,7 @@ title: Write Skew
 
 The page on collisions showed that if two transactions overlap and write the same data then
one will
 fail.  However, in the case where two transactions overlap and one reads data that another
is writing
-then both can succeed.  This behavior is called write skew.
+both can succeed.  This behavior is called write skew.
 
 The example below shows write skew.  In the example, *n0* is a node in a tree with two children
*n01*
 and *n02*.  In *tx2*, the sum of *n0* is set to the sum of its children.  However, *tx2*
misses the
@@ -27,5 +27,4 @@ different keys.
 The changes made by *tx3* will not be seen by *tx2*. This behavior is OK if the update made
by *tx3*
 triggers a later update of *n0:data:sum*. Later pages in the tour will show that Observers
can work
 this way, so that eventually the changes made by *tx3* are incorporated.  The [Weak Notification
-Exercise](/tour/weak-notifications/) later in the tour shows an example of an Observer that
works
-like this.
+Exercise](/tour/weak-notifications/) later in the tour shows an example of this.

-- 
To stop receiving notification emails like this one, please contact
['"commits@fluo.apache.org" <commits@fluo.apache.org>'].

Mime
View raw message