fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [07/10] incubator-fluo-recipes git commit: Updated package names in core module
Date Fri, 15 Jul 2016 22:07:55 GMT
Updated package names in core module

* Packages now have 'org.apache.recipes.core.' prefix


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/beea3f96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/beea3f96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/beea3f96

Branch: refs/heads/master
Commit: beea3f96e5f633341d977d327f6aa1a0f12307f3
Parents: f1dce14
Author: Mike Walch <mwalch@gmail.com>
Authored: Fri Jul 15 11:31:28 2016 -0400
Committer: Mike Walch <mwalch@gmail.com>
Committed: Fri Jul 15 11:46:22 2016 -0400

----------------------------------------------------------------------
 docs/export-queue.md                            |   2 +-
 docs/row-hasher.md                              |   4 +-
 docs/serialization.md                           |   2 +-
 docs/transient.md                               |   2 +-
 .../recipes/accumulo/cmds/CompactTransient.java |   4 +-
 .../recipes/accumulo/cmds/OptimizeTable.java    |   2 +-
 .../accumulo/export/AccumuloExporter.java       |   4 +-
 .../accumulo/export/ReplicationExport.java      |   4 +-
 .../recipes/accumulo/ops/TableOperations.java   |   6 +-
 .../org/apache/fluo/recipes/common/Pirtos.java  |  83 ---
 .../apache/fluo/recipes/common/RowRange.java    |  82 ---
 .../fluo/recipes/common/TransientRegistry.java  |  79 ---
 .../apache/fluo/recipes/core/common/Pirtos.java |  83 +++
 .../fluo/recipes/core/common/RowRange.java      |  82 +++
 .../recipes/core/common/TransientRegistry.java  |  79 +++
 .../fluo/recipes/core/data/RowHasher.java       | 135 ++++
 .../apache/fluo/recipes/core/export/Export.java |  38 ++
 .../fluo/recipes/core/export/ExportBucket.java  | 203 ++++++
 .../fluo/recipes/core/export/ExportEntry.java   |  22 +
 .../recipes/core/export/ExportObserver.java     | 140 ++++
 .../fluo/recipes/core/export/ExportQueue.java   | 273 ++++++++
 .../fluo/recipes/core/export/Exporter.java      |  64 ++
 .../recipes/core/export/SequencedExport.java    |  29 +
 .../fluo/recipes/core/impl/BucketUtil.java      |  24 +
 .../fluo/recipes/core/map/CollisionFreeMap.java | 657 +++++++++++++++++++
 .../core/map/CollisionFreeMapObserver.java      |  53 ++
 .../apache/fluo/recipes/core/map/Combiner.java  |  31 +
 .../recipes/core/map/NullUpdateObserver.java    |  25 +
 .../apache/fluo/recipes/core/map/Update.java    |  43 ++
 .../fluo/recipes/core/map/UpdateObserver.java   |  34 +
 .../core/serialization/SimpleSerializer.java    |  56 ++
 .../fluo/recipes/core/transaction/LogEntry.java | 114 ++++
 .../core/transaction/RecordingTransaction.java  |  64 ++
 .../transaction/RecordingTransactionBase.java   | 250 +++++++
 .../fluo/recipes/core/transaction/TxLog.java    |  79 +++
 .../apache/fluo/recipes/core/types/Encoder.java |  86 +++
 .../fluo/recipes/core/types/StringEncoder.java  |  86 +++
 .../fluo/recipes/core/types/TypeLayer.java      | 488 ++++++++++++++
 .../fluo/recipes/core/types/TypedLoader.java    |  45 ++
 .../fluo/recipes/core/types/TypedObserver.java  |  46 ++
 .../fluo/recipes/core/types/TypedSnapshot.java  |  38 ++
 .../recipes/core/types/TypedSnapshotBase.java   | 555 ++++++++++++++++
 .../recipes/core/types/TypedTransaction.java    |  46 ++
 .../core/types/TypedTransactionBase.java        | 278 ++++++++
 .../org/apache/fluo/recipes/data/RowHasher.java | 135 ----
 .../org/apache/fluo/recipes/export/Export.java  |  38 --
 .../fluo/recipes/export/ExportBucket.java       | 203 ------
 .../apache/fluo/recipes/export/ExportEntry.java |  22 -
 .../fluo/recipes/export/ExportObserver.java     | 140 ----
 .../apache/fluo/recipes/export/ExportQueue.java | 273 --------
 .../apache/fluo/recipes/export/Exporter.java    |  64 --
 .../fluo/recipes/export/SequencedExport.java    |  29 -
 .../apache/fluo/recipes/impl/BucketUtil.java    |  24 -
 .../fluo/recipes/map/CollisionFreeMap.java      | 657 -------------------
 .../recipes/map/CollisionFreeMapObserver.java   |  53 --
 .../org/apache/fluo/recipes/map/Combiner.java   |  31 -
 .../fluo/recipes/map/NullUpdateObserver.java    |  25 -
 .../org/apache/fluo/recipes/map/Update.java     |  43 --
 .../apache/fluo/recipes/map/UpdateObserver.java |  34 -
 .../recipes/serialization/SimpleSerializer.java |  56 --
 .../fluo/recipes/transaction/LogEntry.java      | 114 ----
 .../transaction/RecordingTransaction.java       |  64 --
 .../transaction/RecordingTransactionBase.java   | 250 -------
 .../apache/fluo/recipes/transaction/TxLog.java  |  79 ---
 .../org/apache/fluo/recipes/types/Encoder.java  |  86 ---
 .../fluo/recipes/types/StringEncoder.java       |  86 ---
 .../apache/fluo/recipes/types/TypeLayer.java    | 488 --------------
 .../apache/fluo/recipes/types/TypedLoader.java  |  45 --
 .../fluo/recipes/types/TypedObserver.java       |  46 --
 .../fluo/recipes/types/TypedSnapshot.java       |  38 --
 .../fluo/recipes/types/TypedSnapshotBase.java   | 555 ----------------
 .../fluo/recipes/types/TypedTransaction.java    |  46 --
 .../recipes/types/TypedTransactionBase.java     | 278 --------
 .../fluo/recipes/common/TestGrouping.java       |  94 ---
 .../recipes/common/TransientRegistryTest.java   |  48 --
 .../fluo/recipes/core/common/TestGrouping.java  |  92 +++
 .../core/common/TransientRegistryTest.java      |  48 ++
 .../fluo/recipes/core/data/RowHasherTest.java   |  62 ++
 .../recipes/core/export/DocumentLoader.java     |  36 +
 .../recipes/core/export/DocumentObserver.java   |  63 ++
 .../recipes/core/export/ExportBufferIT.java     | 106 +++
 .../fluo/recipes/core/export/ExportQueueIT.java | 114 ++++
 .../recipes/core/export/ExportTestBase.java     | 286 ++++++++
 .../recipes/core/export/GsonSerializer.java     |  42 ++
 .../fluo/recipes/core/export/OptionsTest.java   |  51 ++
 .../fluo/recipes/core/export/RefInfo.java       |  26 +
 .../fluo/recipes/core/export/RefUpdates.java    |  43 ++
 .../fluo/recipes/core/map/BigUpdateIT.java      | 214 ++++++
 .../recipes/core/map/CollisionFreeMapIT.java    | 361 ++++++++++
 .../fluo/recipes/core/map/DocumentLoader.java   |  35 +
 .../fluo/recipes/core/map/DocumentObserver.java |  89 +++
 .../fluo/recipes/core/map/OptionsTest.java      |  51 ++
 .../fluo/recipes/core/map/SplitsTest.java       |  75 +++
 .../fluo/recipes/core/map/TestSerializer.java   |  45 ++
 .../recipes/core/map/WordCountCombiner.java     |  36 +
 .../recipes/core/map/WordCountObserver.java     |  47 ++
 .../transaction/RecordingTransactionTest.java   | 227 +++++++
 .../fluo/recipes/core/types/MockSnapshot.java   |  30 +
 .../recipes/core/types/MockSnapshotBase.java    | 202 ++++++
 .../recipes/core/types/MockTransaction.java     |  36 +
 .../recipes/core/types/MockTransactionBase.java |  90 +++
 .../fluo/recipes/core/types/TypeLayerTest.java  | 494 ++++++++++++++
 .../apache/fluo/recipes/data/RowHasherTest.java |  62 --
 .../fluo/recipes/export/DocumentLoader.java     |  36 -
 .../fluo/recipes/export/DocumentObserver.java   |  63 --
 .../fluo/recipes/export/ExportBufferIT.java     | 106 ---
 .../fluo/recipes/export/ExportQueueIT.java      | 114 ----
 .../fluo/recipes/export/ExportTestBase.java     | 286 --------
 .../fluo/recipes/export/GsonSerializer.java     |  42 --
 .../apache/fluo/recipes/export/OptionsTest.java |  51 --
 .../org/apache/fluo/recipes/export/RefInfo.java |  26 -
 .../apache/fluo/recipes/export/RefUpdates.java  |  43 --
 .../apache/fluo/recipes/map/BigUpdateIT.java    | 214 ------
 .../fluo/recipes/map/CollisionFreeMapIT.java    | 361 ----------
 .../apache/fluo/recipes/map/DocumentLoader.java |  35 -
 .../fluo/recipes/map/DocumentObserver.java      |  89 ---
 .../apache/fluo/recipes/map/OptionsTest.java    |  51 --
 .../org/apache/fluo/recipes/map/SplitsTest.java |  75 ---
 .../apache/fluo/recipes/map/TestSerializer.java |  45 --
 .../fluo/recipes/map/WordCountCombiner.java     |  36 -
 .../fluo/recipes/map/WordCountObserver.java     |  47 --
 .../transaction/RecordingTransactionTest.java   | 227 -------
 .../apache/fluo/recipes/types/MockSnapshot.java |  30 -
 .../fluo/recipes/types/MockSnapshotBase.java    | 202 ------
 .../fluo/recipes/types/MockTransaction.java     |  36 -
 .../fluo/recipes/types/MockTransactionBase.java |  90 ---
 .../fluo/recipes/types/TypeLayerTest.java       | 494 --------------
 .../recipes/kryo/KryoSimplerSerializer.java     |   2 +-
 .../serialization/KryoSimpleSerializerTest.java |  45 ++
 .../serialization/KryoSimpleSerializerTest.java |  45 --
 .../fluo/recipes/test/AccumuloExportITBase.java |   2 +-
 .../recipes/test/export/AccumuloExporterIT.java |   2 +-
 .../test/export/AccumuloReplicatorIT.java       |  10 +-
 133 files changed, 7315 insertions(+), 7317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/docs/export-queue.md
----------------------------------------------------------------------
diff --git a/docs/export-queue.md b/docs/export-queue.md
index b85895f..0120113 100644
--- a/docs/export-queue.md
+++ b/docs/export-queue.md
@@ -279,7 +279,7 @@ example of write skew mentioned in the Percolater paper.
  1. TH1 : tx1.set(`rowA`,`fam1:qual2`, val1)
  1. TH2 : tx2.set(`rowB`,`fam1:qual2`, val2)
 
-[1]: ../modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java
+[1]: ../modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java
 [2]: https://en.wikipedia.org/wiki/Serializability
 [3]: accumulo-export.md
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/docs/row-hasher.md
----------------------------------------------------------------------
diff --git a/docs/row-hasher.md b/docs/row-hasher.md
index adb423f..8db8af6 100644
--- a/docs/row-hasher.md
+++ b/docs/row-hasher.md
@@ -31,8 +31,8 @@ balancing of the prefix.
 
 ```java
 import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.recipes.common.Pirtos;
-import org.apache.fluo.recipes.data.RowHasher;
+import org.apache.fluo.recipes.core.common.Pirtos;
+import org.apache.fluo.recipes.core.data.RowHasher;
 
 public class RowHasherExample {
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/docs/serialization.md
----------------------------------------------------------------------
diff --git a/docs/serialization.md b/docs/serialization.md
index d7e3b42..9b664a0 100644
--- a/docs/serialization.md
+++ b/docs/serialization.md
@@ -69,5 +69,5 @@ how to do this.
 ```
 
 [1]: https://github.com/EsotericSoftware/kryo
-[2]: ../modules/core/src/main/java/org/apache/fluo/recipes/serialization/SimpleSerializer.java
+[2]: ../modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java
 [3]: https://github.com/EsotericSoftware/kryo#registration

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/docs/transient.md
----------------------------------------------------------------------
diff --git a/docs/transient.md b/docs/transient.md
index be68493..d0ac845 100644
--- a/docs/transient.md
+++ b/docs/transient.md
@@ -76,5 +76,5 @@ first range takes 20 seconds to compact, then it will be compacted again in 600
 seconds.  If the second range takes 80 seconds to compact, then it will be
 compacted again in 800 seconds.
 
-[1]: ../modules/core/src/main/java/org/apache/fluo/recipes/common/TransientRegistry.java
+[1]: ../modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java
 [2]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java
index 7ce53c8..54e05bc 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java
@@ -27,8 +27,8 @@ import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.recipes.accumulo.ops.TableOperations;
-import org.apache.fluo.recipes.common.RowRange;
-import org.apache.fluo.recipes.common.TransientRegistry;
+import org.apache.fluo.recipes.core.common.RowRange;
+import org.apache.fluo.recipes.core.common.TransientRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
index 6b5276b..bf17cfd 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
@@ -19,7 +19,7 @@ import javax.inject.Inject;
 
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.recipes.accumulo.ops.TableOperations;
-import org.apache.fluo.recipes.common.Pirtos;
+import org.apache.fluo.recipes.core.common.Pirtos;
 
 public class OptimizeTable {
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
index cd112eb..1788403 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
@@ -22,8 +22,8 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.observer.Observer.Context;
-import org.apache.fluo.recipes.export.Exporter;
-import org.apache.fluo.recipes.export.SequencedExport;
+import org.apache.fluo.recipes.core.export.Exporter;
+import org.apache.fluo.recipes.core.export.SequencedExport;
 
 /**
  * An {@link Exporter} that takes {@link AccumuloExport} objects and writes mutations to Accumulo

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java
index ec2f4ac..9a3b196 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java
@@ -25,8 +25,8 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.transaction.LogEntry;
-import org.apache.fluo.recipes.transaction.TxLog;
+import org.apache.fluo.recipes.core.transaction.LogEntry;
+import org.apache.fluo.recipes.core.transaction.TxLog;
 
 /**
  * An implementation of {@link AccumuloExport} that replicates a Fluo table to Accumulo using a

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
index e521014..62abd7e 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
@@ -28,9 +28,9 @@ import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.recipes.common.Pirtos;
-import org.apache.fluo.recipes.common.RowRange;
-import org.apache.fluo.recipes.common.TransientRegistry;
+import org.apache.fluo.recipes.core.common.Pirtos;
+import org.apache.fluo.recipes.core.common.RowRange;
+import org.apache.fluo.recipes.core.common.TransientRegistry;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/common/Pirtos.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/common/Pirtos.java b/modules/core/src/main/java/org/apache/fluo/recipes/common/Pirtos.java
deleted file mode 100644
index bdb9e60..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/common/Pirtos.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.common;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.recipes.export.ExportQueue;
-import org.apache.fluo.recipes.map.CollisionFreeMap;
-
-/**
- * Post initialization recommended table optimizations.
- */
-
-public class Pirtos {
-  private List<Bytes> splits = new ArrayList<>();
-  private String tabletGroupingRegex = "";
-
-  public void setSplits(List<Bytes> splits) {
-    this.splits.clear();
-    this.splits.addAll(splits);
-  }
-
-  /**
-   * @return A recommended set of splits points to add to a Fluo table after initialization.
-   */
-  public List<Bytes> getSplits() {
-    return Collections.unmodifiableList(splits);
-  }
-
-  public void setTabletGroupingRegex(String tgr) {
-    Objects.requireNonNull(tgr);
-    this.tabletGroupingRegex = tgr;
-  }
-
-  public String getTabletGroupingRegex() {
-    return "(" + tabletGroupingRegex + ").*";
-  }
-
-  public void merge(Pirtos other) {
-    splits.addAll(other.splits);
-    if (tabletGroupingRegex.length() > 0 && other.tabletGroupingRegex.length() > 0) {
-      tabletGroupingRegex += "|" + other.tabletGroupingRegex;
-    } else {
-      tabletGroupingRegex += other.tabletGroupingRegex;
-    }
-  }
-
-  /**
-   * A utility method to get table optimizations for all configured recipes.
-   */
-  public static Pirtos getConfiguredOptimizations(FluoConfiguration fluoConfig) {
-    try (FluoClient client = FluoFactory.newClient(fluoConfig)) {
-      SimpleConfiguration appConfig = client.getAppConfiguration();
-      Pirtos pirtos = new Pirtos();
-
-      pirtos.merge(ExportQueue.getTableOptimizations(appConfig));
-      pirtos.merge(CollisionFreeMap.getTableOptimizations(appConfig));
-
-      return pirtos;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/common/RowRange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/common/RowRange.java b/modules/core/src/main/java/org/apache/fluo/recipes/common/RowRange.java
deleted file mode 100644
index b1fbc23..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/common/RowRange.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.common;
-
-import java.util.Objects;
-
-import org.apache.fluo.api.data.Bytes;
-
-public class RowRange {
-  private final Bytes start;
-  private final Bytes end;
-
-  public RowRange(Bytes start, Bytes end) {
-    Objects.requireNonNull(start);
-    Objects.requireNonNull(end);
-    this.start = start;
-    this.end = end;
-  }
-
-  public Bytes getStart() {
-    return start;
-  }
-
-  public Bytes getEnd() {
-    return end;
-  }
-
-  @Override
-  public int hashCode() {
-    return start.hashCode() + 31 * end.hashCode();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof RowRange) {
-      RowRange or = (RowRange) o;
-      return start.equals(or.start) && end.equals(or.end);
-    }
-
-    return false;
-  }
-
-  private static void encNonAscii(StringBuilder sb, Bytes bytes) {
-    if (bytes == null) {
-      sb.append("null");
-    } else {
-      for (int i = 0; i < bytes.length(); i++) {
-        byte b = bytes.byteAt(i);
-        if (b >= 32 && b <= 126 && b != '\\') {
-          sb.append((char) b);
-        } else {
-          sb.append(String.format("\\x%02x", b & 0xff));
-        }
-      }
-    }
-  }
-
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("(");
-    encNonAscii(sb, start);
-    sb.append(", ");
-    encNonAscii(sb, end);
-    sb.append("]");
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/common/TransientRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/common/TransientRegistry.java b/modules/core/src/main/java/org/apache/fluo/recipes/common/TransientRegistry.java
deleted file mode 100644
index 681d783..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/common/TransientRegistry.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.common;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.xml.bind.DatatypeConverter;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
-
-/**
- * This class offers a standard way to register transient ranges. The project level documentation
- * provides a comprehensive overview.
- */
-
-public class TransientRegistry {
-
-  private SimpleConfiguration appConfig;
-  private static final String PREFIX = "recipes.transientRange.";
-
-  /**
-   * @param appConfig Fluo application config. Can be obtained from
-   *        {@link FluoConfiguration#getAppConfiguration()} before initializing fluo when adding
-   *        Transient ranges. After Fluo is initialized, app config can be obtained from
-   *        {@link FluoClient#getAppConfiguration()} or
-   *        {@link org.apache.fluo.api.observer.Observer.Context#getAppConfiguration()}
-   */
-  public TransientRegistry(SimpleConfiguration appConfig) {
-    this.appConfig = appConfig;
-  }
-
-  /**
-   * This method is expected to be called before Fluo is initialized to register transient ranges.
-   *
-   */
-  public void addTransientRange(String id, RowRange range) {
-    String start = DatatypeConverter.printHexBinary(range.getStart().toArray());
-    String end = DatatypeConverter.printHexBinary(range.getEnd().toArray());
-
-    appConfig.setProperty(PREFIX + id, start + ":" + end);
-  }
-
-  /**
-   * This method is expected to be called after Fluo is initialized to get the ranges that were
-   * registered before initialization.
-   */
-  public List<RowRange> getTransientRanges() {
-    List<RowRange> ranges = new ArrayList<>();
-    Iterator<String> keys = appConfig.getKeys(PREFIX.substring(0, PREFIX.length() - 1));
-    while (keys.hasNext()) {
-      String key = keys.next();
-      String val = appConfig.getString(key);
-      String[] sa = val.split(":");
-      RowRange rowRange =
-          new RowRange(Bytes.of(DatatypeConverter.parseHexBinary(sa[0])),
-              Bytes.of(DatatypeConverter.parseHexBinary(sa[1])));
-      ranges.add(rowRange);
-    }
-    return ranges;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/common/Pirtos.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/Pirtos.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/Pirtos.java
new file mode 100644
index 0000000..5488e29
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/Pirtos.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.common;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.recipes.core.export.ExportQueue;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap;
+
+/**
+ * Post initialization recommended table optimizations.
+ */
+
+public class Pirtos {
+  private List<Bytes> splits = new ArrayList<>();
+  private String tabletGroupingRegex = "";
+
+  public void setSplits(List<Bytes> splits) {
+    this.splits.clear();
+    this.splits.addAll(splits);
+  }
+
+  /**
+   * @return A recommended set of splits points to add to a Fluo table after initialization.
+   */
+  public List<Bytes> getSplits() {
+    return Collections.unmodifiableList(splits);
+  }
+
+  public void setTabletGroupingRegex(String tgr) {
+    Objects.requireNonNull(tgr);
+    this.tabletGroupingRegex = tgr;
+  }
+
+  public String getTabletGroupingRegex() {
+    return "(" + tabletGroupingRegex + ").*";
+  }
+
+  public void merge(Pirtos other) {
+    splits.addAll(other.splits);
+    if (tabletGroupingRegex.length() > 0 && other.tabletGroupingRegex.length() > 0) {
+      tabletGroupingRegex += "|" + other.tabletGroupingRegex;
+    } else {
+      tabletGroupingRegex += other.tabletGroupingRegex;
+    }
+  }
+
+  /**
+   * A utility method to get table optimizations for all configured recipes.
+   */
+  public static Pirtos getConfiguredOptimizations(FluoConfiguration fluoConfig) {
+    try (FluoClient client = FluoFactory.newClient(fluoConfig)) {
+      SimpleConfiguration appConfig = client.getAppConfiguration();
+      Pirtos pirtos = new Pirtos();
+
+      pirtos.merge(ExportQueue.getTableOptimizations(appConfig));
+      pirtos.merge(CollisionFreeMap.getTableOptimizations(appConfig));
+
+      return pirtos;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/common/RowRange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/RowRange.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/RowRange.java
new file mode 100644
index 0000000..913357b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/RowRange.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.common;
+
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Bytes;
+
+public class RowRange {
+  private final Bytes start;
+  private final Bytes end;
+
+  public RowRange(Bytes start, Bytes end) {
+    Objects.requireNonNull(start);
+    Objects.requireNonNull(end);
+    this.start = start;
+    this.end = end;
+  }
+
+  public Bytes getStart() {
+    return start;
+  }
+
+  public Bytes getEnd() {
+    return end;
+  }
+
+  @Override
+  public int hashCode() {
+    return start.hashCode() + 31 * end.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof RowRange) {
+      RowRange or = (RowRange) o;
+      return start.equals(or.start) && end.equals(or.end);
+    }
+
+    return false;
+  }
+
+  private static void encNonAscii(StringBuilder sb, Bytes bytes) {
+    if (bytes == null) {
+      sb.append("null");
+    } else {
+      for (int i = 0; i < bytes.length(); i++) {
+        byte b = bytes.byteAt(i);
+        if (b >= 32 && b <= 126 && b != '\\') {
+          sb.append((char) b);
+        } else {
+          sb.append(String.format("\\x%02x", b & 0xff));
+        }
+      }
+    }
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("(");
+    encNonAscii(sb, start);
+    sb.append(", ");
+    encNonAscii(sb, end);
+    sb.append("]");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java
new file mode 100644
index 0000000..9533a7a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.common;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+
+/**
+ * This class offers a standard way to register transient ranges. The project level documentation
+ * provides a comprehensive overview.
+ */
+
+public class TransientRegistry {
+
+  private SimpleConfiguration appConfig;
+  private static final String PREFIX = "recipes.transientRange.";
+
+  /**
+   * @param appConfig Fluo application config. Can be obtained from
+   *        {@link FluoConfiguration#getAppConfiguration()} before initializing fluo when adding
+   *        Transient ranges. After Fluo is initialized, app config can be obtained from
+   *        {@link FluoClient#getAppConfiguration()} or
+   *        {@link org.apache.fluo.api.observer.Observer.Context#getAppConfiguration()}
+   */
+  public TransientRegistry(SimpleConfiguration appConfig) {
+    this.appConfig = appConfig;
+  }
+
+  /**
+   * This method is expected to be called before Fluo is initialized to register transient ranges.
+   *
+   */
+  public void addTransientRange(String id, RowRange range) {
+    String start = DatatypeConverter.printHexBinary(range.getStart().toArray());
+    String end = DatatypeConverter.printHexBinary(range.getEnd().toArray());
+
+    appConfig.setProperty(PREFIX + id, start + ":" + end);
+  }
+
+  /**
+   * This method is expected to be called after Fluo is initialized to get the ranges that were
+   * registered before initialization.
+   */
+  public List<RowRange> getTransientRanges() {
+    List<RowRange> ranges = new ArrayList<>();
+    Iterator<String> keys = appConfig.getKeys(PREFIX.substring(0, PREFIX.length() - 1));
+    while (keys.hasNext()) {
+      String key = keys.next();
+      String val = appConfig.getString(key);
+      String[] sa = val.split(":");
+      RowRange rowRange =
+          new RowRange(Bytes.of(DatatypeConverter.parseHexBinary(sa[0])),
+              Bytes.of(DatatypeConverter.parseHexBinary(sa[1])));
+      ranges.add(rowRange);
+    }
+    return ranges;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
new file mode 100644
index 0000000..e09bed5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.data;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.hash.Hashing;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.BytesBuilder;
+import org.apache.fluo.recipes.core.common.Pirtos;
+
+/**
+ * This recipe provides code to help add a hash of the row as a prefix of the row. Using this recipe
+ * rows are structured like the following.
+ * 
+ * <p>
+ * {@code <prefix>:<fixed len row hash>:<user row>}
+ * 
+ * <p>
+ * The recipe also provides code the help generate split points and configure balancing of the
+ * prefix.
+ * 
+ * <p>
+ * The project documentation has more information.
+ */
+public class RowHasher {
+
+  private static final int HASH_LEN = 4;
+
+  public Pirtos getTableOptimizations(int numTablets) {
+
+    List<Bytes> splits = new ArrayList<>(numTablets - 1);
+
+    int numSplits = numTablets - 1;
+    int distance = (((int) Math.pow(Character.MAX_RADIX, HASH_LEN) - 1) / numTablets) + 1;
+    int split = distance;
+    for (int i = 0; i < numSplits; i++) {
+      splits.add(Bytes.of(prefix
+          + Strings.padStart(Integer.toString(split, Character.MAX_RADIX), HASH_LEN, '0')));
+      split += distance;
+    }
+
+    splits.add(Bytes.of(prefix + "~"));
+
+
+    Pirtos pirtos = new Pirtos();
+    pirtos.setSplits(splits);
+    pirtos.setTabletGroupingRegex(Pattern.quote(prefix.toString()));
+
+    return pirtos;
+  }
+
+
+  private Bytes prefix;
+
+  public RowHasher(String prefix) {
+    this.prefix = Bytes.of(prefix + ":");
+  }
+
+  /**
+   * @return Returns input with prefix and hash of input prepended.
+   */
+  public Bytes addHash(String row) {
+    return addHash(Bytes.of(row));
+  }
+
+  /**
+   * @return Returns input with prefix and hash of input prepended.
+   */
+  public Bytes addHash(Bytes row) {
+    BytesBuilder builder = Bytes.newBuilder(prefix.length() + 5 + row.length());
+    builder.append(prefix);
+    builder.append(genHash(row));
+    builder.append(":");
+    builder.append(row);
+    return builder.toBytes();
+  }
+
+  private boolean hasHash(Bytes row) {
+    for (int i = prefix.length(); i < prefix.length() + HASH_LEN; i++) {
+      byte b = row.byteAt(i);
+      boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9');
+      if (!isAlphaNum) {
+        return false;
+      }
+    }
+
+    if (row.byteAt(prefix.length() - 1) != ':' || row.byteAt(prefix.length() + HASH_LEN) != ':') {
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * @return Returns input with prefix and hash stripped from beginning.
+   */
+  public Bytes removeHash(Bytes row) {
+    Preconditions.checkArgument(row.length() >= prefix.length() + 5,
+        "Row is shorter than expected " + row);
+    Preconditions.checkArgument(row.subSequence(0, prefix.length()).equals(prefix),
+        "Row does not have expected prefix " + row);
+    Preconditions.checkArgument(hasHash(row), "Row does not have expected hash " + row);
+    return row.subSequence(prefix.length() + 5, row.length());
+  }
+
+  private static String genHash(Bytes row) {
+    int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt();
+    hash = hash & 0x7fffffff;
+    // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is
+    // nice for debugging.
+    String hashString =
+        Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0');
+    hashString = hashString.substring(hashString.length() - HASH_LEN);
+
+    return hashString;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Export.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Export.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Export.java
new file mode 100644
index 0000000..4f14f65
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Export.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.Objects;
+
+public class Export<K, V> {
+  private final K key;
+  private final V value;
+
+  public Export(K key, V val) {
+    Objects.requireNonNull(key);
+    Objects.requireNonNull(val);
+    this.key = key;
+    this.value = val;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public V getValue() {
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
new file mode 100644
index 0000000..cf6dfb4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import com.google.common.base.Preconditions;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.recipes.core.impl.BucketUtil;
+import org.apache.fluo.recipes.core.types.StringEncoder;
+import org.apache.fluo.recipes.core.types.TypeLayer;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+/**
+ * This class encapsulates a buckets serialization code.
+ */
+class ExportBucket {
+  private static final String NOTIFICATION_CF = "fluoRecipes";
+  private static final String NOTIFICATION_CQ_PREFIX = "eq:";
+  private static final Column EXPORT_COL = new Column("e", "v");
+  private static final Column NEXT_COL = new Column("e", "next");
+
+  static Column newNotificationColumn(String queueId) {
+    return new Column(NOTIFICATION_CF, NOTIFICATION_CQ_PREFIX + queueId);
+  }
+
+  private final TypedTransactionBase ttx;
+  private final String qid;
+  private final Bytes bucketRow;
+
+  static Bytes generateBucketRow(String qid, int bucket, int numBuckets) {
+    return Bytes.of(qid + ":" + BucketUtil.genBucketId(bucket, numBuckets));
+  }
+
+  ExportBucket(TransactionBase tx, String qid, int bucket, int numBuckets) {
+    // TODO encode in a more robust way... but for now fail early
+    Preconditions.checkArgument(!qid.contains(":"), "Export QID can not contain :");
+    this.ttx = new TypeLayer(new StringEncoder()).wrap(tx);
+    this.qid = qid;
+    this.bucketRow = generateBucketRow(qid, bucket, numBuckets);
+  }
+
+  ExportBucket(TransactionBase tx, Bytes bucketRow) {
+    this.ttx = new TypeLayer(new StringEncoder()).wrap(tx);
+
+    int colonLoc = -1;
+
+    for (int i = 0; i < bucketRow.length(); i++) {
+      if (bucketRow.byteAt(i) == ':') {
+        colonLoc = i;
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(colonLoc != -1 && colonLoc != bucketRow.length(),
+        "Invalid bucket row " + bucketRow);
+    Preconditions.checkArgument(bucketRow.byteAt(bucketRow.length() - 1) == ':',
+        "Invalid bucket row " + bucketRow);
+
+    this.bucketRow = bucketRow.subSequence(0, bucketRow.length() - 1);
+    this.qid = bucketRow.subSequence(0, colonLoc).toString();
+  }
+
+  private static byte[] encSeq(long l) {
+    byte[] ret = new byte[8];
+    ret[0] = (byte) (l >>> 56);
+    ret[1] = (byte) (l >>> 48);
+    ret[2] = (byte) (l >>> 40);
+    ret[3] = (byte) (l >>> 32);
+    ret[4] = (byte) (l >>> 24);
+    ret[5] = (byte) (l >>> 16);
+    ret[6] = (byte) (l >>> 8);
+    ret[7] = (byte) (l >>> 0);
+    return ret;
+  }
+
+  private static long decodeSeq(Bytes seq) {
+    return (((long) seq.byteAt(0) << 56) + ((long) (seq.byteAt(1) & 255) << 48)
+        + ((long) (seq.byteAt(2) & 255) << 40) + ((long) (seq.byteAt(3) & 255) << 32)
+        + ((long) (seq.byteAt(4) & 255) << 24) + ((seq.byteAt(5) & 255) << 16)
+        + ((seq.byteAt(6) & 255) << 8) + ((seq.byteAt(7) & 255) << 0));
+  }
+
+
+  public void add(long seq, byte[] key, byte[] value) {
+    Bytes row =
+        Bytes.newBuilder(bucketRow.length() + 1 + key.length + 8).append(bucketRow).append(":")
+            .append(key).append(encSeq(seq)).toBytes();
+    ttx.set(row, EXPORT_COL, Bytes.of(value));
+  }
+
+  /**
+   * Computes the minimial row for a bucket
+   */
+  private Bytes getMinimalRow() {
+    return Bytes.newBuilder(bucketRow.length() + 1).append(bucketRow).append(":").toBytes();
+  }
+
+  public void notifyExportObserver() {
+    ttx.mutate().row(getMinimalRow()).col(newNotificationColumn(qid)).weaklyNotify();
+  }
+
+  public Iterator<ExportEntry> getExportIterator(Bytes continueRow) {
+    ScannerConfiguration sc = new ScannerConfiguration();
+
+    if (continueRow != null) {
+      Span tmpSpan = Span.prefix(bucketRow);
+      Span nextSpan =
+          new Span(new RowColumn(continueRow, EXPORT_COL), true, tmpSpan.getEnd(),
+              tmpSpan.isEndInclusive());
+      sc.setSpan(nextSpan);
+    } else {
+      sc.setSpan(Span.prefix(bucketRow));
+    }
+
+    sc.fetchColumn(EXPORT_COL.getFamily(), EXPORT_COL.getQualifier());
+    RowIterator iter = ttx.get(sc);
+
+    if (iter.hasNext()) {
+      return new ExportIterator(iter);
+    } else {
+      return Collections.<ExportEntry>emptySet().iterator();
+    }
+  }
+
+  private class ExportIterator implements Iterator<ExportEntry> {
+
+    private RowIterator rowIter;
+    private Bytes lastRow;
+
+    public ExportIterator(RowIterator rowIter) {
+      this.rowIter = rowIter;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return rowIter.hasNext();
+    }
+
+    @Override
+    public ExportEntry next() {
+      Entry<Bytes, ColumnIterator> rowCol = rowIter.next();
+      Bytes row = rowCol.getKey();
+
+      Bytes keyBytes = row.subSequence(bucketRow.length() + 1, row.length() - 8);
+      Bytes seqBytes = row.subSequence(row.length() - 8, row.length());
+
+      ExportEntry ee = new ExportEntry();
+
+      ee.key = keyBytes.toArray();
+      ee.seq = decodeSeq(seqBytes);
+      // TODO maybe leave as Bytes?
+      ee.value = rowCol.getValue().next().getValue().toArray();
+
+      lastRow = row;
+
+      return ee;
+    }
+
+    @Override
+    public void remove() {
+      ttx.mutate().row(lastRow).col(EXPORT_COL).delete();
+    }
+  }
+
+  public Bytes getContinueRow() {
+    return ttx.get(getMinimalRow(), NEXT_COL);
+  }
+
+  public void setContinueRow(ExportEntry ee) {
+    Bytes nextRow =
+        Bytes.newBuilder(bucketRow.length() + 1 + ee.key.length + 8).append(bucketRow).append(":")
+            .append(ee.key).append(encSeq(ee.seq)).toBytes();
+
+    ttx.set(getMinimalRow(), NEXT_COL, nextRow);
+  }
+
+  public void clearContinueRow() {
+    ttx.delete(getMinimalRow(), NEXT_COL);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportEntry.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportEntry.java
new file mode 100644
index 0000000..e680204
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportEntry.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+class ExportEntry {
+  byte[] key;
+  long seq;
+  byte[] value;
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
new file mode 100644
index 0000000..940575b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import com.google.common.collect.Iterators;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+public class ExportObserver<K, V> extends AbstractObserver {
+
+  private static class MemLimitIterator implements Iterator<ExportEntry> {
+
+    private long memConsumed = 0;
+    private long memLimit;
+    private int extraPerKey;
+    private Iterator<ExportEntry> source;
+
+    public MemLimitIterator(Iterator<ExportEntry> input, long limit, int extraPerKey) {
+      this.source = input;
+      this.memLimit = limit;
+      this.extraPerKey = extraPerKey;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return memConsumed < memLimit && source.hasNext();
+    }
+
+    @Override
+    public ExportEntry next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      ExportEntry ee = source.next();
+      memConsumed += ee.key.length + extraPerKey + ee.value.length;
+      return ee;
+    }
+
+    @Override
+    public void remove() {
+      source.remove();
+    }
+  }
+
+  private String queueId;
+  private Class<K> keyType;
+  private Class<V> valType;
+  SimpleSerializer serializer;
+  private Exporter<K, V> exporter;
+
+  private long memLimit;
+
+  protected String getQueueId() {
+    return queueId;
+  }
+
+  SimpleSerializer getSerializer() {
+    return serializer;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void init(Context context) throws Exception {
+    queueId = context.getParameters().get("queueId");
+    ExportQueue.Options opts = new ExportQueue.Options(queueId, context.getAppConfiguration());
+
+    // TODO defer loading classes... so that not done during fluo init
+    // TODO move class loading to centralized place... also attempt to check type params
+    keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType);
+    valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType);
+    exporter =
+        getClass().getClassLoader().loadClass(opts.exporterType).asSubclass(Exporter.class)
+            .newInstance();
+
+    serializer = SimpleSerializer.getInstance(context.getAppConfiguration());
+
+    memLimit = opts.getBufferSize();
+
+    exporter.init(queueId, context);
+  }
+
+  @Override
+  public ObservedColumn getObservedColumn() {
+    return new ObservedColumn(ExportBucket.newNotificationColumn(queueId), NotificationType.WEAK);
+  }
+
+  @Override
+  public void process(TransactionBase tx, Bytes row, Column column) throws Exception {
+    ExportBucket bucket = new ExportBucket(tx, row);
+
+    Bytes continueRow = bucket.getContinueRow();
+
+    Iterator<ExportEntry> input = bucket.getExportIterator(continueRow);
+    MemLimitIterator memLimitIter = new MemLimitIterator(input, memLimit, 8 + queueId.length());
+
+    Iterator<SequencedExport<K, V>> exportIterator =
+        Iterators.transform(
+            memLimitIter,
+            ee -> new SequencedExport<>(serializer.deserialize(ee.key, keyType), serializer
+                .deserialize(ee.value, valType), ee.seq));
+
+    exportIterator = Iterators.consumingIterator(exportIterator);
+
+    exporter.processExports(exportIterator);
+
+    if (input.hasNext()) {
+      // not everything was processed so notify self
+      bucket.notifyExportObserver();
+
+      if (!memLimitIter.hasNext()) {
+        // stopped because of mem limit... set continue key
+        bucket.setContinueRow(input.next());
+        continueRow = null;
+      }
+    }
+
+    if (continueRow != null) {
+      bucket.clearContinueRow();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
new file mode 100644
index 0000000..f013872
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.Hashing;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.recipes.core.common.Pirtos;
+import org.apache.fluo.recipes.core.common.RowRange;
+import org.apache.fluo.recipes.core.common.TransientRegistry;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+public class ExportQueue<K, V> {
+
+  private static final String RANGE_BEGIN = "#";
+  private static final String RANGE_END = ":~";
+
+  private int numBuckets;
+  private SimpleSerializer serializer;
+  private String queueId;
+
+  // usage hint : could be created once in an observers init method
+  // usage hint : maybe have a queue for each type of data being exported???
+  // maybe less queues are
+  // more efficient though because more batching at export time??
+  ExportQueue(Options opts, SimpleSerializer serializer) throws Exception {
+    // TODO sanity check key type based on type params
+    // TODO defer creating classes until needed.. so that its not done during Fluo init
+    this.queueId = opts.queueId;
+    this.numBuckets = opts.numBuckets;
+    this.serializer = serializer;
+  }
+
+  public void add(TransactionBase tx, K key, V value) {
+    addAll(tx, Collections.singleton(new Export<>(key, value)).iterator());
+  }
+
+  public void addAll(TransactionBase tx, Iterator<Export<K, V>> exports) {
+
+    Set<Integer> bucketsNotified = new HashSet<>();
+    while (exports.hasNext()) {
+      Export<K, V> export = exports.next();
+
+      byte[] k = serializer.serialize(export.getKey());
+      byte[] v = serializer.serialize(export.getValue());
+
+      int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+      int bucketId = Math.abs(hash % numBuckets);
+
+      ExportBucket bucket = new ExportBucket(tx, queueId, bucketId, numBuckets);
+      bucket.add(tx.getStartTimestamp(), k, v);
+
+      if (!bucketsNotified.contains(bucketId)) {
+        bucket.notifyExportObserver();
+        bucketsNotified.add(bucketId);
+      }
+    }
+  }
+
+  public static <K2, V2> ExportQueue<K2, V2> getInstance(String exportQueueId,
+      SimpleConfiguration appConfig) {
+    Options opts = new Options(exportQueueId, appConfig);
+    try {
+      return new ExportQueue<>(opts, SimpleSerializer.getInstance(appConfig));
+    } catch (Exception e) {
+      // TODO
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Call this method before initializing Fluo.
+   *
+   * @param fluoConfig The configuration that will be used to initialize fluo.
+   */
+  public static void configure(FluoConfiguration fluoConfig, Options opts) {
+    SimpleConfiguration appConfig = fluoConfig.getAppConfiguration();
+    opts.save(appConfig);
+
+    fluoConfig.addObserver(new ObserverConfiguration(ExportObserver.class.getName())
+        .setParameters(Collections.singletonMap("queueId", opts.queueId)));
+
+    Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN);
+    Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END);
+
+    new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("exportQueue."
+        + opts.queueId, new RowRange(exportRangeStart, exportRangeStop));
+  }
+
+  /**
+   * Return suggested Fluo table optimizations for all previously configured export queues.
+   *
+   * @param appConfig Must pass in the application configuration obtained from
+   *        {@code FluoClient.getAppConfiguration()} or
+   *        {@code FluoConfiguration.getAppConfiguration()}
+   */
+
+  public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
+    HashSet<String> queueIds = new HashSet<>();
+    appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
+        k -> queueIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
+
+    Pirtos pirtos = new Pirtos();
+    queueIds.forEach(qid -> pirtos.merge(getTableOptimizations(qid, appConfig)));
+
+    return pirtos;
+  }
+
+  /**
+   * Return suggested Fluo table optimizations for the specified export queue.
+   *
+   * @param appConfig Must pass in the application configuration obtained from
+   *        {@code FluoClient.getAppConfiguration()} or
+   *        {@code FluoConfiguration.getAppConfiguration()}
+   */
+  public static Pirtos getTableOptimizations(String queueId, SimpleConfiguration appConfig) {
+    Options opts = new Options(queueId, appConfig);
+
+    List<Bytes> splits = new ArrayList<>();
+
+    Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN);
+    Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END);
+
+    splits.add(exportRangeStart);
+    splits.add(exportRangeStop);
+
+    List<Bytes> exportSplits = new ArrayList<>();
+    for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+      exportSplits.add(ExportBucket.generateBucketRow(opts.queueId, i, opts.numBuckets));
+    }
+    Collections.sort(exportSplits);
+    splits.addAll(exportSplits);
+
+    Pirtos pirtos = new Pirtos();
+    pirtos.setSplits(splits);
+
+    // the tablet with end row <queueId># does not contain any data for the export queue and
+    // should not be grouped with the export queue
+    pirtos.setTabletGroupingRegex(Pattern.quote(queueId + ":"));
+
+    return pirtos;
+  }
+
+  public static class Options {
+
+    private static final String PREFIX = "recipes.exportQueue.";
+    static final long DEFAULT_BUFFER_SIZE = 1 << 20;
+    static final int DEFAULT_BUCKETS_PER_TABLET = 10;
+
+    int numBuckets;
+    Integer bucketsPerTablet = null;
+    Long bufferSize;
+
+    String keyType;
+    String valueType;
+    String exporterType;
+    String queueId;
+
+    Options(String queueId, SimpleConfiguration appConfig) {
+      this.queueId = queueId;
+
+      this.numBuckets = appConfig.getInt(PREFIX + queueId + ".buckets");
+      this.exporterType = appConfig.getString(PREFIX + queueId + ".exporter");
+      this.keyType = appConfig.getString(PREFIX + queueId + ".key");
+      this.valueType = appConfig.getString(PREFIX + queueId + ".val");
+      this.bufferSize = appConfig.getLong(PREFIX + queueId + ".bufferSize", DEFAULT_BUFFER_SIZE);
+      this.bucketsPerTablet =
+          appConfig.getInt(PREFIX + queueId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
+    }
+
+    public Options(String queueId, String exporterType, String keyType, String valueType,
+        int buckets) {
+      Preconditions.checkArgument(buckets > 0);
+
+      this.queueId = queueId;
+      this.numBuckets = buckets;
+      this.exporterType = exporterType;
+      this.keyType = keyType;
+      this.valueType = valueType;
+    }
+
+
+    public <K, V> Options(String queueId, Class<? extends Exporter<K, V>> exporter,
+        Class<K> keyType, Class<V> valueType, int buckets) {
+      this(queueId, exporter.getName(), keyType.getName(), valueType.getName(), buckets);
+    }
+
+    /**
+     * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
+     * be used to actually deserialize and process the updates. This limit does not account for
+     * object overhead in java, which can be significant.
+     *
+     * <p>
+     * The way memory read is calculated is by summing the length of serialized key and value byte
+     * arrays. Once this sum exceeds the configured memory limit, no more export key values are
+     * processed in the current transaction. When not everything is processed, the observer
+     * processing exports will notify itself causing another transaction to continue processing
+     * later.
+     */
+    public Options setBufferSize(long bufferSize) {
+      Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    long getBufferSize() {
+      if (bufferSize == null) {
+        return DEFAULT_BUFFER_SIZE;
+      }
+
+      return bufferSize;
+    }
+
+    /**
+     * Sets the number of buckets per tablet to generate. This affects how many split points will be
+     * generated when optimizing the Accumulo table.
+     *
+     */
+    public Options setBucketsPerTablet(int bucketsPerTablet) {
+      Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
+          + bucketsPerTablet);
+      this.bucketsPerTablet = bucketsPerTablet;
+      return this;
+    }
+
+    int getBucketsPerTablet() {
+      if (bucketsPerTablet == null) {
+        return DEFAULT_BUCKETS_PER_TABLET;
+      }
+
+      return bucketsPerTablet;
+    }
+
+    void save(SimpleConfiguration appConfig) {
+      appConfig.setProperty(PREFIX + queueId + ".buckets", numBuckets + "");
+      appConfig.setProperty(PREFIX + queueId + ".exporter", exporterType + "");
+      appConfig.setProperty(PREFIX + queueId + ".key", keyType);
+      appConfig.setProperty(PREFIX + queueId + ".val", valueType);
+
+      if (bufferSize != null) {
+        appConfig.setProperty(PREFIX + queueId + ".bufferSize", bufferSize);
+      }
+      if (bucketsPerTablet != null) {
+        appConfig.setProperty(PREFIX + queueId + ".bucketsPerTablet", bucketsPerTablet);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java
new file mode 100644
index 0000000..529d5f3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.Iterator;
+
+import org.apache.fluo.api.observer.Observer.Context;
+
+public abstract class Exporter<K, V> {
+
+  public void init(String queueId, Context observerContext) throws Exception {}
+
+  /**
+   * Must be able to handle same key being exported multiple times and key being exported out of
+   * order. The sequence number is meant to help with this.
+   *
+   * <p>
+   * If multiple export entries with the same key are passed in, then the entries with the same key
+   * will be consecutive and in ascending sequence order.
+   *
+   * <p>
+   * If the call to process exports is unexpectedly terminated, it will be called again later with
+   * at least the same data. For example suppose an exporter was passed the following entries.
+   *
+   * <ul>
+   * <li>key=0 sequence=9 value=abc
+   * <li>key=1 sequence=13 value=d
+   * <li>key=1 sequence=17 value=e
+   * <li>key=1 sequence=23 value=f
+   * <li>key=2 sequence=19 value=x
+   * </ul>
+   *
+   * <p>
+   * Assume the exporter exports some of these and then fails before completing all of them. The
+   * next time its called it will be passed what it saw before, but it could also be passed more.
+   *
+   * <ul>
+   * <li>key=0 sequence=9 value=abc
+   * <li>key=1 sequence=13 value=d
+   * <li>key=1 sequence=17 value=e
+   * <li>key=1 sequence=23 value=f
+   * <li>key=1 sequence=29 value=g
+   * <li>key=2 sequence=19 value=x
+   * <li>key=2 sequence=77 value=y
+   * </ul>
+   *
+   */
+  protected abstract void processExports(Iterator<SequencedExport<K, V>> exports);
+
+  // TODO add close
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/SequencedExport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/SequencedExport.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/SequencedExport.java
new file mode 100644
index 0000000..ef1cfe2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/SequencedExport.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+public class SequencedExport<K, V> extends Export<K, V> {
+  private final long seq;
+
+  SequencedExport(K k, V v, long seq) {
+    super(k, v);
+    this.seq = seq;
+  }
+
+  public long getSequence() {
+    return seq;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java
new file mode 100644
index 0000000..06bdfa8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.impl;
+
+public class BucketUtil {
+  public static String genBucketId(int bucket, int maxBucket) {
+    int bucketLen = Integer.toHexString(maxBucket).length();
+    // TODO printf is slow
+    return String.format("%0" + bucketLen + "x", bucket);
+  }
+}


Mime
View raw message