Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9BAB5200BAE for ; Fri, 28 Oct 2016 18:58:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9A044160AE4; Fri, 28 Oct 2016 16:58:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 473A4160ACA for ; Fri, 28 Oct 2016 18:57:59 +0200 (CEST) Received: (qmail 14332 invoked by uid 500); 28 Oct 2016 16:57:58 -0000 Mailing-List: contact commits-help@fluo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@fluo.incubator.apache.org Delivered-To: mailing list commits@fluo.incubator.apache.org Received: (qmail 14321 invoked by uid 99); 28 Oct 2016 16:57:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 16:57:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E2217180601 for ; Fri, 28 Oct 2016 16:57:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id uB5_ExGVpXzK for ; Fri, 28 Oct 2016 16:57:43 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id AFCA65FC4A for ; Fri, 28 Oct 2016 16:57:39 +0000 (UTC) Received: (qmail 13842 invoked by uid 99); 28 Oct 2016 16:57:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 16:57:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86DD0F1597; Fri, 28 Oct 2016 16:57:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: mwalch@apache.org To: commits@fluo.incubator.apache.org Date: Fri, 28 Oct 2016 16:57:44 -0000 Message-Id: <1e395435eb2b4c86b589b2a4ffb11c87@git.apache.org> In-Reply-To: <4ae001eae02741b8b0719d89fa6914d0@git.apache.org> References: <4ae001eae02741b8b0719d89fa6914d0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [7/8] incubator-fluo-website git commit: Jekyll build from gh-pages:887634d archived-at: Fri, 28 Oct 2016 16:58:01 -0000 http://git-wip-us.apache.org/repos/asf/incubator-fluo-website/blob/f6125fa2/docs/fluo-recipes/1.0.0-incubating/export-queue/index.html ---------------------------------------------------------------------- diff --git a/docs/fluo-recipes/1.0.0-incubating/export-queue/index.html b/docs/fluo-recipes/1.0.0-incubating/export-queue/index.html new file mode 100644 index 0000000..8bf3399 --- /dev/null +++ b/docs/fluo-recipes/1.0.0-incubating/export-queue/index.html @@ -0,0 +1,409 @@ + + + + + + + + + + + + + Export Queue Recipe | Apache Fluo + + + + +
+
+
+
+ + + + + +
+

Export Queue Recipe

+
+ +
+

Background

+ +

Fluo is not suited for servicing low latency queries for two reasons. The +first reason is that the implementation of transactions are designed for +throughput. To get throughput, transactions recover lazily from failures and +may wait on another transaction that is writing. Both of these design decisions +can lead to delays for an individual transaction, but do not negatively impact +throughput. The second reason is that Fluo observers executing transactions +will likely cause a large number of random accesses. This could lead to high +response time variability for an individual random access. This variability +would not impede throughput but would impede the goal of latency.

+ +

One way to make data transformed by Fluo available for low latency queries is +to export that data to another system. For example Fluo could be running +cluster A, continually transforming a large data set, and exporting data to +Accumulo tables on cluster B. The tables on cluster B would service user +queries. Fluo Recipes has built in support for exporting to Accumulo, +however recipe could be used to export to systems other than Accumulo, like +Redis, Elasticsearch, MySQL, etc.

+ +

Exporting data from Fluo is easy to get wrong which is why this recipe exists. +To understand what can go wrong consider the following example observer +transaction.

+ +
public class MyObserver extends AbstractObserver {
+
+    private static final TYPEL = new TypeLayer(new StringEncoder());
+
+    //reperesents a Query system extrnal to Fluo that is updated by Fluo
+    QuerySystem querySystem;
+
+    @Override
+    public void process(TransactionBase tx, Bytes row, Column col) {
+
+        TypedTransactionBase ttx = TYPEL.wrap(tx);
+        int oldCount = ttx.get().row(row).fam("meta").qual("counter1").toInteger(0);
+        int numUpdates = ttx.get().row(row).fam("meta").qual("numUpdates").toInteger(0);
+        int newCount = oldCount + numUpdates;
+        ttx.mutate().row(row).fam("meta").qual("counter1").set(newCount);
+        ttx.mutate().row(row).fam("meta").qual("numUpdates").set(0);
+
+        //Build an inverted index in the query system, based on count from the
+        //meta:counter1 column in fluo.  Do this by creating rows for the
+        //external query system based on the count.        
+        String oldCountRow = String.format("%06d", oldCount);
+        String newCountRow = String.format("%06d", newCount);
+        
+        //add a new entry to the inverted index
+        querySystem.insertRow(newCountRow, row);
+        //remove the old entry from the inverted index
+        querySystem.deleteRow(oldCountRow, row);
+    }
+}
+
+
+ +

The above example would keep the external index up to date beautifully as long +as the following conditions are met.

+ +
    +
  • Threads executing transactions always complete successfully.
  • +
  • Only a single thread ever responds to a notification.
  • +
+ +

However these conditions are not guaranteed by Fluo. Multiple threads may +attempt to process a notification concurrently (only one may succeed). Also at +any point in time a transaction may fail (for example the computer executing it +may reboot). Both of these problems will occur and will lead to corruption of +the external index in the example. The inverted index and Fluo will become +inconsistent. The inverted index will end up with multiple entries (that are +never cleaned up) for single entity even though the intent is to only have one.

+ +

The root of the problem in the example above is that its exporting uncommitted +data. There is no guarantee that setting the column <row>:meta:counter1 to +newCount will succeed until the transaction is successfully committed. +However, newCountRow is derived from newCount and written to the external query +system before the transaction is committed (Note : for observers, the +transaction is committed by the framework after process(...) is called). So +if the transaction fails, the next time it runs it could compute a completely +different value for newCountRow (and it would not delete what was written by the +failed transaction).

+ +

Solution

+ +

The simple solution to the problem of exporting uncommitted data is to only +export committed data. There are multiple ways to accomplish this. This +recipe offers a reusable implementation of one method. This recipe has the +following elements:

+ +
    +
  • An export queue that transactions can add key/values to. Only if the transaction commits successfully will the key/value end up in the queue. A Fluo application can have multiple export queues, each one must have a unique id.
  • +
  • When a key/value is added to the export queue, its given a sequence number. This sequence number is based on the transactions start timestamp.
  • +
  • Each export queue is configured with an observer that processes key/values that were successfully committed to the queue.
  • +
  • When key/values in an export queue are processed, they are deleted so the export queue does not keep any long term data.
  • +
  • Key/values in an export queue are placed in buckets. This is done so that all of the updates in a bucket can be processed in a single transaction. This allows an efficient implementation of this recipe in Fluo. It can also lead to efficiency in a system being exported to, if the system can benefit from batching updates. The number of buckets in an export queue is configurable.
  • +
+ +

There are three requirements for using this recipe :

+ +
    +
  • Must configure export queues before initializing a Fluo application.
  • +
  • Transactions adding to an export queue must get an instance of the queue using its unique QID.
  • +
  • Must implement a class that extends Exporter in order to process exports.
  • +
+ +

Schema

+ +

Each export queue stores its data in the Fluo table in a contiguous row range. +This row range is defined by using the export queue id as a row prefix for all +data in the export queue. So the row range defined by the export queue id +should not be used by anything else.

+ +

All data stored in an export queue is transient. When an export +queue is configured, it will recommend split points using the table +optimization process. The number of splits generated +by this process can be controlled by setting the number of buckets per tablet +when configuring an export queue.

+ +

Example Use

+ +

This example will show how to build an inverted index in an external +query system using an export queue. The class below is simple POJO to hold the +count update, this will be used as the value for the export queue.

+ +
class CountUpdate {
+  public int oldCount;
+  public int newCount;
+  
+  public CountUpdate(int oc, int nc) {
+    this.oldCount = oc;
+    this.newCount = nc;
+  }
+}
+
+
+ +

The following code shows how to configure an export queue. This code will +modify the FluoConfiguration object with options needed for the export queue. +This FluoConfiguration object should be used to initialize the fluo +application.

+ +
   FluoConfiguration fluoConfig = ...;
+
+   //queue id "ici" means inverted count index, would probably use static final in real application
+   String exportQueueID = "ici";  
+   Class<CountExporter> exporterType = CountExporter.class;
+   Class<Bytes> keyType = Bytes.class;
+   Class<CountUpdate> valueType = CountUpdate.class;
+   int numBuckets = 1009;
+   //the desired number of tablets to create when applying table optimizations
+   int numTablets = 100;
+
+   ExportQueue.Options eqOptions =
+        new ExportQueue.Options(exportQueueId, exporterType, keyType, valueType, numBuckets)
+          .setsetBucketsPerTablet(numBuckets/numTablets);
+   ExportQueue.configure(fluoConfig, eqOptions);
+
+   //initialize Fluo using fluoConfig
+
+
+ +

Below is updated version of the observer from above thats now using the export +queue.

+ +
public class MyObserver extends AbstractObserver {
+
+    private static final TYPEL = new TypeLayer(new StringEncoder());
+
+    private ExportQueue<Bytes, CountUpdate> exportQueue;
+
+    @Override
+    public void init(Context context) throws Exception {
+      exportQueue = ExportQueue.getInstance("ici", context.getAppConfiguration());
+    }
+
+    @Override
+    public void process(TransactionBase tx, Bytes row, Column col) {
+
+        TypedTransactionBase ttx = TYPEL.wrap(tx);
+        int oldCount = ttx.get().row(row).fam("meta").qual("counter1").toInteger(0);
+        int numUpdates = ttx.get().row(row).fam("meta").qual("numUpdates").toInteger(0);
+        int newCount = oldCount + numUpdates;
+        ttx.mutate().row(row).fam("meta").qual("counter1").set(newCount);
+        ttx.mutate().row(row).fam("meta").qual("numUpdates").set(0);
+
+        //Because the update to the export queue is part of the transaction,
+        //either the update to meta:counter1 is made and an entry is added to
+        //the export queue or neither happens.
+        exportQueue.add(tx, row, new CountUpdate(oldCount, newCount));
+    }
+}
+
+
+ +

The observer setup for the export queue will call the processExports() method +on the class below to process entries in the export queue. It possible the +call to processExports() can fail part way through and/or be called multiple +times. In the case of failures the exporter will eventually be called again +with the exact same data. The possibility of the same export entry being +processed on multiple computer at different times can cause exports to arrive +out of order. The system receiving exports has to be resilient to data +arriving out of order and multiple times. The purpose of the sequence number +is to help systems receiving data from Fluo process out of order and redundant +data.

+ +
  public class CountExporter extends Exporter<Bytes, CountUpdate> {
+    //represents the external query system we want to update from Fluo
+    QuerySystem querySystem;
+
+    @Override
+    protected void processExports(Iterator<SequencedExport<Bytes, CountUpdate>> exportIterator) {
+      BatchUpdater batchUpdater = querySystem.getBatchUpdater();
+      while(exportIterator.hasNext()){
+        SequencedExport<Bytes, CountUpdate> exportEntry =  exportItertor.next();
+        Bytes row =  exportEntry.getKey();
+        UpdateCount uc = exportEntry.getValue();
+        long seqNum = exportEntry.getSequence();
+
+        String oldCountRow = String.format("%06d", uc.oldCount);
+        String newCountRow = String.format("%06d", uc.newCount);
+
+        //add a new entry to the inverted index
+        batchUpdater.insertRow(newCountRow, row, seqNum);
+        //remove the old entry from the inverted index
+        batchUpdater.deleteRow(oldCountRow, row, seqNum);
+      }
+
+      //flush all of the updates to the external query system
+      batchUpdater.close();
+    }
+  }
+
+
+ +

Concurrency

+ +

Additions to the export queue will never collide. If two transactions add the +same key at around the same time and successfully commit, then two entries with +different sequence numbers will always be added to the queue. The sequence +number is based on the start timestamp of the transactions.

+ +

If the key used to add items to the export queue is deterministically derived +from something the transaction is writing to, then that will cause a collision. +For example consider the following interleaving of two transactions adding to +the same export queue in a manner that will collide. Note, TH1 is shorthand for +thread 1, ek() is a function the creates the export key, and ev() is a function +that creates the export value.

+ +
    +
  1. TH1 : key1 = ek(row1,fam1:qual1)
  2. +
  3. TH1 : val1 = ev(tx1.get(row1,fam1:qual1), tx1.get(rowA,fam1:qual2))
  4. +
  5. TH1 : exportQueueA.add(tx1, key1, val1)
  6. +
  7. TH2 : key2 = ek(row1,fam1:qual1)
  8. +
  9. TH2 : val2 = ev(tx2.get(row1,fam1:qual1), tx2.get(rowB,fam1:qual2))
  10. +
  11. TH2 : exportQueueA.add(tx2, key2, val2)
  12. +
  13. TH1 : tx1.set(row1,fam1:qual1, val1)
  14. +
  15. TH2 : tx2.set(row1,fam1:qual1, val2)
  16. +
+ +

In the example above only one transaction will succeed because both are setting +row1 fam1:qual1. Since adding to the export queue is part of the +transaction, only the transaction that succeeds will add something to the +queue. If the funtion ek() in the example is deterministic, then both +transactions would have been trying to add the same key to the export queue.

+ +

With the above method, we know that transactions adding entries to the queue for +the same key must have executed serially. Knowing that transactions which +added the same key did not overlap in time makes reasoning about those export +entries very simple.

+ +

The example below is a slight modification of the example above. In this +example both transactions will successfully add entries to the queue using the +same key. Both transactions succeed because they are writing to different +cells (rowB fam1:qual2 and rowA fam1:qual2). This approach makes it more +difficult to reason about export entries with the same key, because the +transactions adding those entries could have overlapped in time. This is an +example of write skew mentioned in the Percolater paper.

+ +
    +
  1. TH1 : key1 = ek(row1,fam1:qual1)
  2. +
  3. TH1 : val1 = ev(tx1.get(row1,fam1:qual1), tx1.get(rowA,fam1:qual2))
  4. +
  5. TH1 : exportQueueA.add(tx1, key1, val1)
  6. +
  7. TH2 : key2 = ek(row1,fam1:qual1)
  8. +
  9. TH2 : val2 = ev(tx2.get(row1,fam1:qual1), tx2.get(rowB,fam1:qual2))
  10. +
  11. TH2 : exportQueueA.add(tx2, key2, val2)
  12. +
  13. TH1 : tx1.set(rowA,fam1:qual2, val1)
  14. +
  15. TH2 : tx2.set(rowB,fam1:qual2, val2)
  16. +
+ + +
+ +
+ +
+
+
+ +
+ + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-fluo-website/blob/f6125fa2/docs/fluo-recipes/1.0.0-incubating/index.html ---------------------------------------------------------------------- diff --git a/docs/fluo-recipes/1.0.0-incubating/index.html b/docs/fluo-recipes/1.0.0-incubating/index.html new file mode 100644 index 0000000..dbed369 --- /dev/null +++ b/docs/fluo-recipes/1.0.0-incubating/index.html @@ -0,0 +1,202 @@ + + + + + + + + + + + + + Fluo Recipes 1.0.0-incubating Documentation | Apache Fluo + + + + +
+
+
+
+ + + + + +
+

Fluo Recipes 1.0.0-incubating Documentation

+
+ +
+

Fluo Recipes are common code for Apache Fluo application developers.

+ +

Fluo Recipes build on the Fluo API to offer additinal functionality to +developers. They are published seperately from Fluo on their own release schedule. +This allows Fluo Recipes to iterate and innovate faster than Fluo (which will maintain +a more minimal API on a slower release cycle).

+ +

Documentation

+ +

Recipes are documented below and in the Recipes API docs.

+ +
    +
  • Collision Free Map - A recipe for making many to many updates.
  • +
  • Export Queue - A recipe for exporting data from Fluo to external systems.
  • +
  • Row Hash Prefix - A recipe for spreading data evenly in a row prefix.
  • +
  • RecordingTransaction - A wrapper for a Fluo transaction that records all transaction +operations to a log which can be used to export data from Fluo.
  • +
  • Testing Some code to help write Fluo Integration test.
  • +
+ +

Recipes have common needs that are broken down into the following reusable components.

+ + + +

Usage

+ +

The Fluo Recipes project publishes multiple jars to Maven Central for each release. +The fluo-recipes-core jar is the primary jar. It is where most recipes live and where +they are placed by default if they have minimal dependencies beyond the Fluo API.

+ +

Fluo Recipes with dependencies that bring in many transitive dependencies publish +their own jar. For example, recipes that depend on Apache Spark are published in the +fluo-recipes-spark jar. If you don’t plan on using code in the fluo-recipes-spark +jar, you should avoid including it in your pom.xml to avoid a transitive dependency on +Spark.

+ +

Below is a sample Maven POM containing all possible Fluo Recipes dependencies:

+ +
  <properties>
+    <fluo-recipes.version>1.0.0-incubating</fluo-recipes.version>
+  </properties>
+
+  <dependencies>
+    <!-- Required. Contains recipes that are only depend on the Fluo API -->
+    <dependency>
+      <groupId>org.apache.fluo</groupId>
+      <artifactId>fluo-recipes-core</artifactId>
+      <version>${fluo-recipes.version}</version>
+    </dependency>
+    <!-- Optional. Serialization code that depends on Kryo -->
+    <dependency>
+      <groupId>org.apache.fluo</groupId>
+      <artifactId>fluo-recipes-kryo</artifactId>
+      <version>${fluo-recipes.version}</version>
+    </dependency>
+    <!-- Optional. Common code for using Fluo with Accumulo -->
+    <dependency>
+      <groupId>org.apache.fluo</groupId>
+      <artifactId>fluo-recipes-accumulo</artifactId>
+      <version>${fluo-recipes.version}</version>
+    </dependency>
+    <!-- Optional. Common code for using Fluo with Spark -->
+    <dependency>
+      <groupId>org.apache.fluo</groupId>
+      <artifactId>fluo-recipes-spark</artifactId>
+      <version>${fluo-recipes.version}</version>
+    </dependency>
+    <!-- Optional. Common code for writing Fluo integration tests -->
+    <dependency>
+      <groupId>org.apache.fluo</groupId>
+      <artifactId>fluo-recipes-test</artifactId>
+      <version>${fluo-recipes.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+
+ + +
+ +
+ +
+
+
+ +
+ + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-fluo-website/blob/f6125fa2/docs/fluo-recipes/1.0.0-incubating/recording-tx/index.html ---------------------------------------------------------------------- diff --git a/docs/fluo-recipes/1.0.0-incubating/recording-tx/index.html b/docs/fluo-recipes/1.0.0-incubating/recording-tx/index.html new file mode 100644 index 0000000..1580e2c --- /dev/null +++ b/docs/fluo-recipes/1.0.0-incubating/recording-tx/index.html @@ -0,0 +1,187 @@ + + + + + + + + + + + + + RecordingTransaction recipe | Apache Fluo + + + + +
+
+
+
+ + + + + +
+

RecordingTransaction recipe

+
+ +
+

A RecordingTransaction is an implementation of Transaction that logs all transaction operations +(i.e GET, SET, or DELETE) to a TxLog object for later uses such as exporting data. The code below +shows how a RecordingTransaction is created by wrapping a Transaction object:

+ +
RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx);
+
+
+ +

A predicate function can be passed to wrap method to select which log entries to record. The code +below only records log entries whose column family is meta:

+ +
RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx,
+                               le -> le.getColumn().getFamily().toString().equals("meta"));
+
+
+ +

After creating a RecordingTransaction, users can use it as they would use a Transaction object.

+ +
Bytes value = rtx.get(Bytes.of("r1"), new Column("cf1", "cq1"));
+
+
+ +

While SET or DELETE operations are always recorded to the log, GET operations are only recorded if a +value was found at the requested row/column. Also, if a GET method returns an iterator, only the GET +operations that are retrieved from the iterator are logged. GET operations are logged as they are +necessary if you want to determine the changes made by the transaction.

+ +

When you are done operating on the transaction, you can retrieve the TxLog using the following code:

+ +
TxLog myTxLog = rtx.getTxLog()
+
+
+ +

Below is example code of how a RecordingTransaction can be used in an observer to record all operations +performed by the transaction in a TxLog. In this example, a GET (if data exists) and SET operation +will be logged. This TxLog can be added to an export queue and later used to export updates from +Fluo.

+ +
public class MyObserver extends AbstractObserver {
+
+    private static final TYPEL = new TypeLayer(new StringEncoder());
+    
+    private ExportQueue<Bytes, TxLog> exportQueue;
+
+    @Override
+    public void process(TransactionBase tx, Bytes row, Column col) {
+
+        // create recording transaction (rtx)
+        RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx);
+        
+        // use rtx to create a typed transaction & perform operations
+        TypedTransactionBase ttx = TYPEL.wrap(rtx);
+        int count = ttx.get().row(row).fam("meta").qual("counter1").toInteger(0);
+        ttx.mutate().row(row).fam("meta").qual("counter1").set(count+1);
+        
+        // when finished performing operations, retrieve transaction log
+        TxLog txLog = rtx.getTxLog()
+
+        // add txLog to exportQueue if not empty
+        if (!txLog.isEmpty()) {
+          //do not pass rtx to exportQueue.add()
+          exportQueue.add(tx, row, txLog)
+        }
+    }
+}
+
+
+ +
+ +
+ +
+
+
+ +
+ + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-fluo-website/blob/f6125fa2/docs/fluo-recipes/1.0.0-incubating/row-hasher/index.html ---------------------------------------------------------------------- diff --git a/docs/fluo-recipes/1.0.0-incubating/row-hasher/index.html b/docs/fluo-recipes/1.0.0-incubating/row-hasher/index.html new file mode 100644 index 0000000..04245f6 --- /dev/null +++ b/docs/fluo-recipes/1.0.0-incubating/row-hasher/index.html @@ -0,0 +1,235 @@ + + + + + + + + + + + + + Row hash prefix recipe | Apache Fluo + + + + +
+
+
+
+ + + + + +
+

Row hash prefix recipe

+
+ +
+

Background

+ +

Transactions are implemented in Fluo using conditional mutations. Conditional +mutations require server side processing on tservers. If data is not spread +evenly, it can cause some tservers to execute more conditional mutations than +others. These tservers doing more work can become a bottleneck. Most real +world data is not uniform and can cause this problem.

+ +

Before the Fluo Webindex example started using this recipe it suffered +from this problem. The example was using reverse dns encoded URLs for row keys +like p:com.cnn/story1.html. This made certain portions of the table more +popular, which in turn made some tservers do much more work. This uneven +distribution of work lead to lower throughput and uneven performance. Using +this recipe made those problems go away.

+ +

Solution

+ +

This recipe provides code to help add a hash of the row as a prefix of the row. +Using this recipe rows are structured like the following.

+ +
<prefix>:<fixed len row hash>:<user row>
+
+
+ +

The recipe also provides code to help generate split points and configure +balancing of the prefix.

+ +

Example Use

+ +
import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.recipes.core.data.RowHasher;
+
+public class RowHasherExample {
+
+
+  private static final RowHasher PAGE_ROW_HASHER = new RowHasher("p");
+
+  // Provide one place to obtain row hasher.
+  public static RowHasher getPageRowHasher() {
+    return PAGE_ROW_HASHER;
+  }
+
+  public static void main(String[] args) {
+    RowHasher pageRowHasher = getPageRowHasher();
+
+    String revUrl = "org.wikipedia/accumulo";
+
+    // Add a hash prefix to the row. Use this hashedRow in your transaction
+    Bytes hashedRow = pageRowHasher.addHash(revUrl);
+    System.out.println("hashedRow      : " + hashedRow);
+
+    // Remove the prefix. This can be used by transactions dealing with the hashed row.
+    Bytes orig = pageRowHasher.removeHash(hashedRow);
+    System.out.println("orig           : " + orig);
+
+
+    // Generate table optimizations for the recipe. This can be called when setting up an
+    // application that uses a hashed row.
+    int numTablets = 20;
+
+    // The following code would normally be called before initializing Fluo. This code
+    // registers table optimizations for your prefix+hash.
+    FluoConfiguration conf = new FluoConfiguration();
+    RowHasher.configure(conf, PAGE_ROW_HASHER.getPrefix(), numTablets);
+
+    // Normally you would not call the following code, it would be called automatically for you by
+    // TableOperations.optimizeTable(). Calling this code here to show what table optimization will
+    // be generated.
+    TableOptimizations tableOptimizations = new RowHasher.Optimizer()
+        .getTableOptimizations(PAGE_ROW_HASHER.getPrefix(), conf.getAppConfiguration());
+    System.out.println("Balance config : " + tableOptimizations.getTabletGroupingRegex());
+    System.out.println("Splits         : ");
+    tableOptimizations.getSplits().forEach(System.out::println);
+    System.out.println();
+  }
+}
+
+
+ +

The example program above prints the following.

+ +
hashedRow      : p:1yl0:org.wikipedia/accumulo
+orig           : org.wikipedia/accumulo
+Balance config : (\Qp:\E).*
+Splits         : 
+p:1sst
+p:3llm
+p:5eef
+p:7778
+p:9001
+p:assu
+p:clln
+p:eeeg
+p:g779
+p:i002
+p:jssv
+p:lllo
+p:neeh
+p:p77a
+p:r003
+p:sssw
+p:ullp
+p:weei
+p:y77b
+p:~
+
+
+ +

The split points are used to create tablets in the Accumulo table used by Fluo. +Data and computation will spread very evenly across these tablets. The +Balancing config will spread the tablets evenly across the tablet servers, +which will spread the computation evenly. See the table optimizations +documentation for information on how to apply the optimizations.

+ + +
+ +
+ +
+
+
+ +
+ + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-fluo-website/blob/f6125fa2/docs/fluo-recipes/1.0.0-incubating/serialization/index.html ---------------------------------------------------------------------- diff --git a/docs/fluo-recipes/1.0.0-incubating/serialization/index.html b/docs/fluo-recipes/1.0.0-incubating/serialization/index.html new file mode 100644 index 0000000..0169b06 --- /dev/null +++ b/docs/fluo-recipes/1.0.0-incubating/serialization/index.html @@ -0,0 +1,188 @@ + + + + + + + + + + + + + Serializing Data | Apache Fluo + + + + +
+
+
+
+ + + + + +
+

Serializing Data

+
+ +
+

Various Fluo Recipes deal with POJOs and need to serialize them. The +serialization mechanism is configurable and defaults to using Kryo.

+ +

Custom Serialization

+ +

In order to use a custom serialization method, two steps need to be taken. The +first step is to implement SimpleSerializer. The second step is to +configure Fluo Recipes to use the custom implementation. This needs to be done +before initializing Fluo. Below is an example of how to do this.

+ +
  FluoConfiguration fluoConfig = ...;
+  //assume MySerializer implements SimpleSerializer
+  SimpleSerializer.setSetserlializer(fluoConfig, MySerializer.class);
+  //initialize Fluo using fluoConfig
+
+
+ +

Kryo Factory

+ +

If using the default Kryo serializer implementation, then creating a +KryoFactory implementation can lead to smaller serialization size. When Kryo +serializes an object graph, it will by default include the fully qualified +names of the classes in the serialized data. This can be avoided by +registering classes that will be serialized. Registration is done by +creating a KryoFactory and then configuring Fluo Recipes to use it. The +example below shows how to do this.

+ +

For example assume the POJOs named Node and Edge will be serialized and +need to be registered with Kryo. This could be done by creating a KryoFactory +like the following.

+ +

+package com.foo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.pool.KryoFactory;
+
+import com.foo.data.Edge;
+import com.foo.data.Node;
+
+public class MyKryoFactory implements KryoFactory {
+  @Override
+  public Kryo create() {
+    Kryo kryo = new Kryo();
+    
+    //Explicitly assign each class a unique id here to ensure its stable over
+    //time and in different environments with different dependencies.
+    kryo.register(Node.class, 9);
+    kryo.register(Edge.class, 10);
+    
+    //instruct kryo that these are the only classes we expect to be serialized
+    kryo.setRegistrationRequired(true);
+    
+    return kryo;
+  }
+}
+
+
+ +

Fluo Recipes must be configured to use this factory. The following code shows +how to do this.

+ +
  FluoConfiguration fluoConfig = ...;
+  KryoSimplerSerializer.setKryoFactory(fluoConfig, MyKryoFactory.class);
+  //initialize Fluo using fluoConfig
+
+
+ + +
+ +
+ +
+
+
+ +
+ + + + + + + + +