Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E9C24200B9C for ; Mon, 10 Oct 2016 21:34:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E8610160AEB; Mon, 10 Oct 2016 19:34:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BEA24160AD1 for ; Mon, 10 Oct 2016 21:34:10 +0200 (CEST) Received: (qmail 50023 invoked by uid 500); 10 Oct 2016 19:34:09 -0000 Mailing-List: contact commits-help@fluo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@fluo.incubator.apache.org Delivered-To: mailing list commits@fluo.incubator.apache.org Received: (qmail 50014 invoked by uid 99); 10 Oct 2016 19:34:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Oct 2016 19:34:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 50F3F180149 for ; Mon, 10 Oct 2016 19:34:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id oxStIvC91GRy for ; Mon, 10 Oct 2016 19:34:00 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 556FB5F610 for ; Mon, 10 Oct 2016 19:33:58 +0000 (UTC) Received: (qmail 49884 invoked by uid 99); 10 Oct 2016 19:33:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Oct 2016 19:33:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 60721E08AF; Mon, 10 Oct 2016 19:33:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: kturner@apache.org To: commits@fluo.incubator.apache.org Date: Mon, 10 Oct 2016 19:33:59 -0000 Message-Id: <089d6fed098d4c72a4d0f1567dc80641@git.apache.org> In-Reply-To: <006c2e77eb6e426dbd0fe6bad5fc8a39@git.apache.org> References: <006c2e77eb6e426dbd0fe6bad5fc8a39@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] incubator-fluo-website git commit: Jekyll build from gh-pages: b359233 archived-at: Mon, 10 Oct 2016 19:34:14 -0000 http://git-wip-us.apache.org/repos/asf/incubator-fluo-website/blob/76022205/tour/exercise-1/index.html ---------------------------------------------------------------------- diff --git a/tour/exercise-1/index.html b/tour/exercise-1/index.html new file mode 100644 index 0000000..d4dc119 --- /dev/null +++ b/tour/exercise-1/index.html @@ -0,0 +1,545 @@ + + + + + + + + + + + + + Word count Exercise | Apache Fluo + + + + +
+
+
+ + + +
+

Fluo Tour: Word count Exercise

+

Tour page 19 of 26

+
+
+

This excercise gives you an opportunity to use everything you have learned so +far to attempt writing a simple Fluo application. A bare minimum of code, +along with a conceptual sketch of a solution, is provided to get you started.

+ +

The application should compute word counts for unique documents. This +application should do the following.

+ +
    +
  • Deduplicate content based on hash
  • +
  • Count how many URIs reference content
  • +
  • For the unique words in content, update global word counts.
  • +
  • When new content is added increment the global counts.
  • +
  • When content is no longer referenced by any URIs, decrement the global word counts and delete +that content.
  • +
  • Partition different types of data using row prefixes. Use u: for URIs, use d: for document +content, and use w: for word counts.
  • +
+ +

Part 1 : Loading data.

+ +

The class below is a simple POJO for documents.

+ +
package ft;
+
+import com.google.common.hash.Hashing;
+
+public class Document {
+  public final String uri;
+  public final String content;
+
+  public Document(String uri, String content) {
+    this.uri = uri;
+    this.content = content;
+  }
+
+  public String hash() {
+    //use short prefix of hash for example
+    return Hashing.sha1().hashString(content).toString().substring(0, 7);
+  }
+}
+
+
+ +

The following code loads documents into Fluo. It should do the following :

+ +
    +
  • Keep track of the current hash associated with a URI.
  • +
  • Deduplicate content based on hash
  • +
  • Reference count how many URIs point to content. Track this information in a column named +doc:refc with a row based on the hash.
  • +
  • Track the status of whether content is referenced or unreferenced in a column named doc:refs. +Note refs is short for reference status. When the reference count for content is 0 this +columns value should be unreferenced. When the reference count is greater than 0, the +doc:refs columns value should be referenced. In a later example, an Observer will watch this +column.
  • +
  • Track the content associated with a hash using the doc:content column.
  • +
+ +

Some of this is implemented below, but not all. The parts that are not done have TODOs.

+ +
package ft;
+
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Column;
+
+public class DocLoader implements Loader {
+
+  private final Document doc;
+
+  public static final Column HASH_COL = new Column("uri", "hash");
+  public static final Column REF_COUNT_COL = new Column("doc", "refc");
+  public static final Column REF_STATUS_COL = new Column("doc", "refs");
+  public static final Column CONTENT_COL = new Column("doc", "content");
+
+  public DocLoader(Document doc) {
+    this.doc = doc;
+  }
+
+  @Override
+  public void load(TransactionBase tx, Context context) throws Exception {
+    String newHash = doc.hash();
+    String oldHash = tx.gets("u:" + doc.uri, HASH_COL);
+
+    // TODO check if uri already has the same content hash.  If so, then nothing to do.
+
+    // TODO set the new hash associated with the URI
+
+    if (oldHash != null) {
+      // TODO decrement the reference count at row "d:"+oldHash
+      // TODO set REF_STATUS_COL to "unreferenced" when the reference count goes from 1 to 0. Do
+      // this for row "d:"+oldHash
+    }
+
+    // TODO increment the reference count for the newHash content.
+    // TODO add the new content when the reference count does not exists
+    // TODO set REF_STATUS_COL to "referenced" when the reference count for the new content goes
+    // from 0 to 1.  Do this for row "d:"+newHash
+  }
+}
+
+
+ +

Add the following to the ft.Main class.

+ +
  // some test data
+  private static Document[] docs1 = new Document[] {
+      new Document("http://news.com/a23",
+          "Jebediah orbits Mun for 35 days.  No power, forgot solar panels."),
+      new Document("http://news.com/a24",
+          "Bill plans to rescue Jebediah after taking tourist to Minimus.")};
+
+  private static Document[] docs2 = new Document[] {new Document("http://oldnews.com/a23",
+      "Jebediah orbits Mun for 35 days.  No power, forgot solar panels.")};
+
+  private static Document[] docs3 = new Document[] {
+      new Document("http://news.com/a23",
+          "Jebediah orbits Mun for 38 days.  No power, forgot solar panels."),
+      new Document("http://news.com/a24",
+          "Crisis at KSC.  Tourist stuck at Minimus.  Bill forgot solar panels.")};
+
+  /**
+   * Utility method for loading documents and printing out Fluo table after load completes.
+   */
+  private static void loadAndPrint(MiniFluo mini, FluoClient client, Document[] docs) {
+
+    try (LoaderExecutor loaderExecutor = client.newLoaderExecutor()) {
+      for (Document document : docs) {
+        loaderExecutor.execute(new DocLoader(document));
+      }
+    } // this will close loaderExecutor and wait for all load transactions to complete
+
+    //This line is not needed in this step of the excercise.  However the next step will need this
+    //line.
+    mini.waitForObservers();
+
+    System.out.println("**** begin table dump ****");
+    try (Snapshot snap = client.newSnapshot()) {
+      snap.scanner().build().forEach(rcv -> System.out.println("  " + rcv));
+    }
+    System.out.println("**** end table dump ****\n");
+  }
+
+  private static void excercise(MiniFluo mini, FluoClient client) {
+    loadAndPrint(mini, client, docs1);
+    loadAndPrint(mini, client, docs2);
+    loadAndPrint(mini, client, docs3);
+  }
+
+
+ +

Once the TODOs in the DocLoader class are implemented, running Main should print out the following.

+ +
**** begin table dump ****
+  d:a6c4d1f doc content  Jebediah orbits Mun for 35 days.  No power, forgot solar panels.
+  d:a6c4d1f doc refc  1
+  d:a6c4d1f doc refs  referenced
+  d:cf8ddc0 doc content  Bill plans to rescue Jebediah after taking tourist to Minimus.
+  d:cf8ddc0 doc refc  1
+  d:cf8ddc0 doc refs  referenced
+  u:http://news.com/a23 uri hash  a6c4d1f
+  u:http://news.com/a24 uri hash  cf8ddc0
+**** end table dump ****
+
+**** begin table dump ****
+  d:a6c4d1f doc content  Jebediah orbits Mun for 35 days.  No power, forgot solar panels.
+  d:a6c4d1f doc refc  2
+  d:a6c4d1f doc refs  referenced
+  d:cf8ddc0 doc content  Bill plans to rescue Jebediah after taking tourist to Minimus.
+  d:cf8ddc0 doc refc  1
+  d:cf8ddc0 doc refs  referenced
+  u:http://news.com/a23 uri hash  a6c4d1f
+  u:http://news.com/a24 uri hash  cf8ddc0
+  u:http://oldnews.com/a23 uri hash  a6c4d1f
+**** end table dump ****
+
+**** begin table dump ****
+  d:2732ebc doc content  Crisis at KSC.  Tourist stuck at Minimus.  Bill forgot solar panels.
+  d:2732ebc doc refc  1
+  d:2732ebc doc refs  referenced
+  d:6658252 doc content  Jebediah orbits Mun for 38 days.  No power, forgot solar panels.
+  d:6658252 doc refc  1
+  d:6658252 doc refs  referenced
+  d:a6c4d1f doc content  Jebediah orbits Mun for 35 days.  No power, forgot solar panels.
+  d:a6c4d1f doc refc  1
+  d:a6c4d1f doc refs  referenced
+  d:cf8ddc0 doc content  Bill plans to rescue Jebediah after taking tourist to Minimus.
+  d:cf8ddc0 doc refc  0
+  d:cf8ddc0 doc refs  unreferenced
+  u:http://news.com/a23 uri hash  6658252
+  u:http://news.com/a24 uri hash  2732ebc
+  u:http://oldnews.com/a23 uri hash  a6c4d1f
+**** end table dump ****
+
+
+ +

Part 2 : Computing word counts.

+ +

Now that you have data loading, create an observer that watches the reference +status column. This observer should increment word counts when new content is +referenced and decrement word counts when content is dereferenced. The +observer should also delete the content when its dereferenced.

+ +

Make sure you handle the following scenario correctly.

+ +
    +
  • content A becomes referenced
  • +
  • content A becomes unreferenced
  • +
  • an observer runs on content A
  • +
+ +

In this situation, word counts were never incremented for content A so there is no need to decrement +the word counts. One way to handle this is to have a column that tracks if word counts were +incremented.

+ +

Below is a skeleton for an observer to compute word counts.

+ +
package ft;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.observer.AbstractObserver;
+
+public class ContentObserver extends AbstractObserver {
+
+  public static final Column PROCESSED_COL = new Column("doc", "processed");
+  public static final Column WORD_COUNT = new Column("word","docCount");
+
+  /**
+   * Utility method to tokenize the content of a document into unique words.
+   */
+  private Set<String> tokenize(String content) {
+    return new HashSet<String>(Arrays.asList(content.split("[ .!,]+")));
+  }
+
+  /**
+   *  Adds the passed to delta to the values for each word.
+   */
+  private void adjustCounts(TransactionBase tx, int delta, Set<String> words) {
+    // TODO make a single call to get all of the current word counts.  Could use
+    //tx.gets(Collection<RowColumn>)
+
+    // TODO for each word, add delta to the current value and set the new value
+  }
+
+
+  @Override
+  public void process(TransactionBase tx, Bytes brow, Column col) throws Exception {
+
+    String row = brow.toString();
+
+    Map<Column, String> colVals =
+        tx.gets(row, DocLoader.CONTENT_COL, DocLoader.REF_STATUS_COL, PROCESSED_COL);
+
+    String content = colVals.get(DocLoader.CONTENT_COL);
+    String status = colVals.get(DocLoader.REF_STATUS_COL);
+    String processed = colVals.getOrDefault(PROCESSED_COL, "false");
+
+    // TODO if status is referenced and not already processed the adjustCounts by +1 and set
+    // PROCESSED_COL to true
+
+    // TODO is status is unreferenced then delete all columns for content
+    // TODO if status is unreferenced and document was processed, then adjust counts by -1
+  }
+
+
+  @Override
+  public ObservedColumn getObservedColumn() {
+    return new ObservedColumn(DocLoader.REF_STATUS_COL, NotificationType.STRONG);
+  }
+}
+
+
+ +

Something to think about: why observe the reference status column instead of the reference count +column?

+ +

When you are ready to run the observer, modify the preInit() method in ft.Main to configure the +observer as follows.

+ +
  private static void preInit(FluoConfiguration fluoConfig) {
+    fluoConfig.addObserver(new ObserverSpecification(ContentObserver.class.getName()));
+  }
+
+
+ +

After implementing the Observer, the output of the program should look like the following.

+ +
**** begin table dump ****
+  d:a6c4d1f doc content  Jebediah orbits Mun for 35 days.  No power, forgot solar panels.
+  d:a6c4d1f doc processed  true
+  d:a6c4d1f doc refc  1
+  d:a6c4d1f doc refs  referenced
+  d:cf8ddc0 doc content  Bill plans to rescue Jebediah after taking tourist to Minimus.
+  d:cf8ddc0 doc processed  true
+  d:cf8ddc0 doc refc  1
+  d:cf8ddc0 doc refs  referenced
+  u:http://news.com/a23 uri hash  a6c4d1f
+  u:http://news.com/a24 uri hash  cf8ddc0
+  w:35 word docCount  1
+  w:Bill word docCount  1
+  w:Jebediah word docCount  2
+  w:Minimus word docCount  1
+  w:Mun word docCount  1
+  w:No word docCount  1
+  w:after word docCount  1
+  w:days word docCount  1
+  w:for word docCount  1
+  w:forgot word docCount  1
+  w:orbits word docCount  1
+  w:panels word docCount  1
+  w:plans word docCount  1
+  w:power word docCount  1
+  w:rescue word docCount  1
+  w:solar word docCount  1
+  w:taking word docCount  1
+  w:to word docCount  1
+  w:tourist word docCount  1
+**** end table dump ****
+
+**** begin table dump ****
+  d:a6c4d1f doc content  Jebediah orbits Mun for 35 days.  No power, forgot solar panels.
+  d:a6c4d1f doc processed  true
+  d:a6c4d1f doc refc  2
+  d:a6c4d1f doc refs  referenced
+  d:cf8ddc0 doc content  Bill plans to rescue Jebediah after taking tourist to Minimus.
+  d:cf8ddc0 doc processed  true
+  d:cf8ddc0 doc refc  1
+  d:cf8ddc0 doc refs  referenced
+  u:http://news.com/a23 uri hash  a6c4d1f
+  u:http://news.com/a24 uri hash  cf8ddc0
+  u:http://oldnews.com/a23 uri hash  a6c4d1f
+  w:35 word docCount  1
+  w:Bill word docCount  1
+  w:Jebediah word docCount  2
+  w:Minimus word docCount  1
+  w:Mun word docCount  1
+  w:No word docCount  1
+  w:after word docCount  1
+  w:days word docCount  1
+  w:for word docCount  1
+  w:forgot word docCount  1
+  w:orbits word docCount  1
+  w:panels word docCount  1
+  w:plans word docCount  1
+  w:power word docCount  1
+  w:rescue word docCount  1
+  w:solar word docCount  1
+  w:taking word docCount  1
+  w:to word docCount  1
+  w:tourist word docCount  1
+**** end table dump ****
+
+**** begin table dump ****
+  d:2732ebc doc content  Crisis at KSC.  Tourist stuck at Minimus.  Bill forgot solar panels.
+  d:2732ebc doc processed  true
+  d:2732ebc doc refc  1
+  d:2732ebc doc refs  referenced
+  d:6658252 doc content  Jebediah orbits Mun for 38 days.  No power, forgot solar panels.
+  d:6658252 doc processed  true
+  d:6658252 doc refc  1
+  d:6658252 doc refs  referenced
+  d:a6c4d1f doc content  Jebediah orbits Mun for 35 days.  No power, forgot solar panels.
+  d:a6c4d1f doc processed  true
+  d:a6c4d1f doc refc  1
+  d:a6c4d1f doc refs  referenced
+  u:http://news.com/a23 uri hash  6658252
+  u:http://news.com/a24 uri hash  2732ebc
+  u:http://oldnews.com/a23 uri hash  a6c4d1f
+  w:35 word docCount  1
+  w:38 word docCount  1
+  w:Bill word docCount  1
+  w:Crisis word docCount  1
+  w:Jebediah word docCount  2
+  w:KSC word docCount  1
+  w:Minimus word docCount  1
+  w:Mun word docCount  2
+  w:No word docCount  2
+  w:Tourist word docCount  1
+  w:at word docCount  1
+  w:days word docCount  2
+  w:for word docCount  2
+  w:forgot word docCount  3
+  w:orbits word docCount  2
+  w:panels word docCount  3
+  w:power word docCount  2
+  w:solar word docCount  3
+  w:stuck word docCount  1
+**** end table dump ****
+
+
+ +

Part 3 : Using Fluo Recipes

+ +

The way to compute word counts above is very prone to transactional collisions. One way to avoid +these collisions is to use the CollisionFreeMap provided in Fluo Recipes. Currently Fluo Recipes is +not released, this section will be updated with more information once it is.

+ +
+ + + +
+ +

+ + < + + + 19 / 26 + + > + +

+
+ +
+
+
+ +
+ + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-fluo-website/blob/76022205/tour/index.html ---------------------------------------------------------------------- diff --git a/tour/index.html b/tour/index.html index a3065b6..0ef1818 100644 --- a/tour/index.html +++ b/tour/index.html @@ -10,7 +10,7 @@ - Apache Fluo Tour | Apache Fluo + Fluo Tour | Apache Fluo + + + +
+
+
+ + + +
+

Fluo Tour: Loader Executor

+

Tour page 16 of 26

+
+
+

Fluo provides a simple mechanism to help load data called the LoaderExecutor. Loading data +into Fluo requires a transaction. The LoaderExecutor manages creating, committing, and retrying +transactions when collisions occur. It also runs transactions in multiple threads and batches +commit processing of separate transactions for efficiency. FluoConfiguration provides two methods +for configuring LoaderExecutors setLoaderQueueSize() and setLoaderThreads().

+ +

Objects that implement Loader are given to a LoaderExecutor. The load() method will +eventually be called on these objects at which point the passed in transactions can be used to load +data. When close() is called on a LoaderExecutor, it waits for all running and queued work to +finish.

+ +

There is no stand alone exercise for the LoaderExecutor. Hands on experience with it can be +obtained by completing the word count exercise that is a few pages later in +the tour.

+ + +
+ + + +
+ +

+ + < + + + 16 / 26 + + > + +

+
+ +
+
+
+ +
+ + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-fluo-website/blob/76022205/tour/mem-self-ntfy-code/index.html ---------------------------------------------------------------------- diff --git a/tour/mem-self-ntfy-code/index.html b/tour/mem-self-ntfy-code/index.html new file mode 100644 index 0000000..bb56968 --- /dev/null +++ b/tour/mem-self-ntfy-code/index.html @@ -0,0 +1,252 @@ + + + + + + + + + + + + + Memory limits and self notify code | Apache Fluo + + + + +
+
+
+ + + +
+

Fluo Tour: Memory limits and self notify code

+

Tour page 25 of 26

+
+
+
  static Column NC = new Column("ntyf", "sum");
+  static Column TOTAL_COL = new Column("sum", "total");
+  static Column UPDATE_COL = new Column("sum", "update");
+  static Column CONTINUE_COL = new Column("sum", "continue");
+
+  public static class SummingObserver extends AbstractObserver {
+
+    private int maxToProcess;
+
+    @Override
+    public void init(Context context) throws Exception {
+      //made the max amount to process in a single transaction configurable
+      maxToProcess = context.getObserverConfiguration().getInt("maxToProcess", 100);
+    }
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+      return new ObservedColumn(NC, NotificationType.WEAK);
+    }
+
+    @Override
+    public void process(TransactionBase tx, Bytes brow, Column col) throws Exception {
+
+      String row = brow.toString();
+
+      Map<Column, String> colVals = tx.gets(row, TOTAL_COL, CONTINUE_COL);
+
+      int sum = Integer.parseInt(colVals.getOrDefault(TOTAL_COL, "0"));
+      
+      // construct a scan range that uses the continue row
+      String startRow = colVals.getOrDefault(CONTINUE_COL, row + "/");
+      String endRow = row + "/:"; // after the character '9' comes ':'
+      CellScanner scanner = tx.scanner().over(new Span(startRow, true, endRow, false)).build();
+
+      int processed = 0;
+      
+      for (RowColumnValue rcv : scanner) {
+        if (processed >= maxToProcess) {
+          // stop processing and set the continue row
+          tx.set(row, CONTINUE_COL, rcv.getsRow());
+          tx.setWeakNotification(brow, col);
+          break;
+        }
+        sum += Integer.parseInt(rcv.getsValue());
+        tx.delete(rcv.getRow(), rcv.getColumn());
+        processed++;
+      }
+
+      System.out.println("sum : " + sum + "  start: " + startRow + "  processed: " + processed);
+
+      tx.set(row, TOTAL_COL, "" + sum);
+
+      // if did not set the continue column and it exists, then delete it
+      if (processed < maxToProcess && colVals.containsKey(CONTINUE_COL)) {
+        tx.delete(row, CONTINUE_COL);
+        // need to start over at the beginning and see if there is new data before the continue
+        // column
+        tx.setWeakNotification(brow, col);
+      }
+    }
+  }
+
+  private static void preInit(FluoConfiguration fluoConfig) {
+    ObserverSpecification ospec = new ObserverSpecification(SummingObserver.class.getName());
+    ospec.getConfiguration().setProperty("maxToProcess", 500);
+    fluoConfig.addObserver(ospec);
+  }
+
+  private static void excercise(MiniFluo mini, FluoClient client) {
+    try (LoaderExecutor le = client.newLoaderExecutor()) {
+      Random r = new Random(42);
+      for (int i = 0; i < 5000; i++) {
+        // The Loader interface only has one function and can therefore be written as a lambda
+        // below.
+        le.execute((tx, ctx) -> {
+          String row = "counter001/" + String.format("%07d", r.nextInt(10000000));
+          int curVal = Integer.parseInt(tx.gets(row, UPDATE_COL, "0"));
+          tx.set(row, UPDATE_COL, curVal + 1 + "");
+          tx.setWeakNotification("counter001", NC);
+        });
+      }
+    }
+
+    mini.waitForObservers();
+
+    try (Snapshot snap = client.newSnapshot()) {
+      System.out.println("final sum : " + snap.gets("counter001", TOTAL_COL));
+    }
+  }
+
+
+ +

The code above will print something like the following.

+ +
$ mvn -q clean compile exec:java
+Starting MiniFluo ... started.
+sum : 500  start: counter001/  processed: 500
+sum : 891  start: counter001/7945963  processed: 390
+sum : 1391  start: counter001/  processed: 500
+sum : 1891  start: counter001/2938489  processed: 500
+sum : 2391  start: counter001/5210523  processed: 500
+sum : 2892  start: counter001/6912090  processed: 500
+sum : 3392  start: counter001/8410312  processed: 500
+sum : 3398  start: counter001/9991522  processed: 6
+sum : 3898  start: counter001/  processed: 500
+sum : 4398  start: counter001/1824962  processed: 500
+sum : 4899  start: counter001/4076664  processed: 500
+sum : 5000  start: counter001/6993690  processed: 101
+sum : 5000  start: counter001/  processed: 0
+final sum : 5000
+
+
+ + +
+ + + +
+ +

+ + < + + + 25 / 26 + + > + +

+
+ +
+
+
+ +
+ + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-fluo-website/blob/76022205/tour/mem-self-ntfy/index.html ---------------------------------------------------------------------- diff --git a/tour/mem-self-ntfy/index.html b/tour/mem-self-ntfy/index.html new file mode 100644 index 0000000..b19103a --- /dev/null +++ b/tour/mem-self-ntfy/index.html @@ -0,0 +1,159 @@ + + + + + + + + + + + + + Memory limits and self notify | Apache Fluo + + + + +
+
+
+ + + +
+

Fluo Tour: Memory limits and self notify

+

Tour page 24 of 26

+
+
+

All modifications made as part of a transaction must fit into memory because the sets and deletes +are buffered in memory until commit. If there is more data to process that will fit in memory, one +way to handle this is to process some data and self notify.

+ +

As an exercise try modifying the weak notification exercise and +making it self notify. Modify the observer such that it does the following :

+ +
    +
  • Processes a maximum number of updates. Could make this configurable using per observer +configuration.
  • +
  • When the max is reached : +
      +
    • Stop processing.
    • +
    • Records the stop row.
    • +
    • Notify self.
    • +
    +
  • +
  • Use the row where it previously stopped when creating scan range.
  • +
  • Delete the stop row, if it exists, after scanning to the end of the range.
  • +
+ +
+ + + +
+ +

+ + < + + + 24 / 26 + + > + +

+
+ +
+
+
+ +
+ + + + + + + + +