fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [2/2] incubator-fluo-recipes git commit: Misc release prep cleanup
Date Mon, 17 Oct 2016 14:56:28 GMT
Misc release prep cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/10ccb16a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/10ccb16a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/10ccb16a

Branch: refs/heads/master
Commit: 10ccb16aedd8c141f5bd21be6822481471afa7ad
Parents: 573aeb7
Author: Keith Turner <kturner@apache.org>
Authored: Fri Oct 14 17:04:59 2016 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Fri Oct 14 17:43:06 2016 -0400

----------------------------------------------------------------------
 README.md                                       |  16 +
 docs/accumulo-export-queue.md                   |  16 +
 docs/cfm.md                                     |  16 +
 docs/export-queue.md                            |  16 +
 docs/recording-tx.md                            |  16 +
 docs/row-hasher.md                              |  16 +
 docs/serialization.md                           |  16 +
 docs/table-optimization.md                      |  16 +
 docs/testing.md                                 |  16 +
 docs/transient.md                               |  16 +
 modules/accumulo/pom.xml                        |   2 +-
 modules/core/pom.xml                            |   2 +-
 .../recipes/core/common/TableOptimizations.java |   2 +-
 .../fluo/recipes/core/export/ExportBucket.java  |  14 +-
 .../fluo/recipes/core/impl/BucketUtil.java      |  31 --
 .../fluo/recipes/core/map/CollisionFreeMap.java |  22 +-
 .../recipes/core/export/DocumentLoader.java     |  35 --
 .../recipes/core/export/DocumentObserver.java   |  63 ----
 .../recipes/core/export/ExportBufferIT.java     | 106 ------
 .../fluo/recipes/core/export/ExportQueueIT.java | 114 ------
 .../recipes/core/export/ExportTestBase.java     | 271 --------------
 .../recipes/core/export/GsonSerializer.java     |  42 ---
 .../fluo/recipes/core/export/OptionsTest.java   |   1 +
 .../fluo/recipes/core/export/RefInfo.java       |  26 --
 .../fluo/recipes/core/export/RefUpdates.java    |  43 ---
 .../recipes/core/export/it/DocumentLoader.java  |  35 ++
 .../core/export/it/DocumentObserver.java        |  64 ++++
 .../recipes/core/export/it/ExportBufferIT.java  | 107 ++++++
 .../recipes/core/export/it/ExportQueueIT.java   | 114 ++++++
 .../recipes/core/export/it/ExportTestBase.java  | 274 ++++++++++++++
 .../recipes/core/export/it/GsonSerializer.java  |  42 +++
 .../fluo/recipes/core/export/it/RefInfo.java    |  26 ++
 .../fluo/recipes/core/export/it/RefUpdates.java |  43 +++
 .../fluo/recipes/core/map/BigUpdateIT.java      | 208 -----------
 .../recipes/core/map/CollisionFreeMapIT.java    | 352 ------------------
 .../fluo/recipes/core/map/DocumentLoader.java   |  35 --
 .../fluo/recipes/core/map/DocumentObserver.java |  89 -----
 .../fluo/recipes/core/map/SplitsTest.java       |   1 +
 .../fluo/recipes/core/map/TestSerializer.java   |  45 ---
 .../recipes/core/map/WordCountCombiner.java     |  36 --
 .../recipes/core/map/WordCountObserver.java     |  47 ---
 .../fluo/recipes/core/map/it/BigUpdateIT.java   | 212 +++++++++++
 .../recipes/core/map/it/CollisionFreeMapIT.java | 353 +++++++++++++++++++
 .../recipes/core/map/it/DocumentLoader.java     |  35 ++
 .../recipes/core/map/it/DocumentObserver.java   |  90 +++++
 .../recipes/core/map/it/TestSerializer.java     |  45 +++
 .../recipes/core/map/it/WordCountCombiner.java  |  38 ++
 .../recipes/core/map/it/WordCountObserver.java  |  49 +++
 modules/kryo/pom.xml                            |   2 +-
 .../recipes/kryo/KryoSimplerSerializer.java     |   2 +-
 .../serialization/KryoSimpleSerializerTest.java |  45 ---
 .../recipes/kryo/KryoSimpleSerializerTest.java  |  46 +++
 modules/spark/pom.xml                           |   2 +-
 .../fluo/recipes/spark/FluoSparkHelperIT.java   |  82 -----
 .../recipes/spark/it/FluoSparkHelperIT.java     |  84 +++++
 modules/test/pom.xml                            |   2 +-
 .../fluo/recipes/test/AccumuloExportITBase.java |   1 -
 pom.xml                                         |  38 +-
 58 files changed, 1856 insertions(+), 1722 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index b592204..e5d3a5a 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,19 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 # Apache Fluo Recipes
 
 [![Build Status][ti]][tl] [![Apache License][li]][ll]

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/accumulo-export-queue.md
----------------------------------------------------------------------
diff --git a/docs/accumulo-export-queue.md b/docs/accumulo-export-queue.md
index b880d58..8b4bf39 100644
--- a/docs/accumulo-export-queue.md
+++ b/docs/accumulo-export-queue.md
@@ -1,3 +1,19 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 # Accumulo Export Queue Specialization
 
 ## Background

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/cfm.md
----------------------------------------------------------------------
diff --git a/docs/cfm.md b/docs/cfm.md
index 7b3a2ac..85b05b0 100644
--- a/docs/cfm.md
+++ b/docs/cfm.md
@@ -1,3 +1,19 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 # Collision Free Map Recipe
 
 ## Background

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/export-queue.md
----------------------------------------------------------------------
diff --git a/docs/export-queue.md b/docs/export-queue.md
index c366e9d..7ad931f 100644
--- a/docs/export-queue.md
+++ b/docs/export-queue.md
@@ -1,3 +1,19 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 # Export Queue Recipe
 
 ## Background

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/recording-tx.md
----------------------------------------------------------------------
diff --git a/docs/recording-tx.md b/docs/recording-tx.md
index 0dca693..7a6fb8e 100644
--- a/docs/recording-tx.md
+++ b/docs/recording-tx.md
@@ -1,3 +1,19 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 # RecordingTransaction recipe
 
 A `RecordingTransaction` is an implementation of `Transaction` that logs all transaction operations

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/row-hasher.md
----------------------------------------------------------------------
diff --git a/docs/row-hasher.md b/docs/row-hasher.md
index 1f5158d..d6d603b 100644
--- a/docs/row-hasher.md
+++ b/docs/row-hasher.md
@@ -1,3 +1,19 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 # Row hash prefix recipe
 
 ## Background

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/serialization.md
----------------------------------------------------------------------
diff --git a/docs/serialization.md b/docs/serialization.md
index 9b664a0..25b9d83 100644
--- a/docs/serialization.md
+++ b/docs/serialization.md
@@ -1,3 +1,19 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 # Serializing Data
 
 Various Fluo Recipes deal with POJOs and need to serialize them.  The

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/table-optimization.md
----------------------------------------------------------------------
diff --git a/docs/table-optimization.md b/docs/table-optimization.md
index 7235981..f170b77 100644
--- a/docs/table-optimization.md
+++ b/docs/table-optimization.md
@@ -1,3 +1,19 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 # Fluo Table Optimization
 
 ## Background

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/testing.md
----------------------------------------------------------------------
diff --git a/docs/testing.md b/docs/testing.md
index 520fd1d..0908857 100644
--- a/docs/testing.md
+++ b/docs/testing.md
@@ -1,3 +1,19 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 # Testing
 
 Fluo includes MiniFluo which makes it possible to write an integeration test that

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/transient.md
----------------------------------------------------------------------
diff --git a/docs/transient.md b/docs/transient.md
index d0ac845..a73a197 100644
--- a/docs/transient.md
+++ b/docs/transient.md
@@ -1,3 +1,19 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 # Transient data
 
 ## Background

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/modules/accumulo/pom.xml b/modules/accumulo/pom.xml
index 945b9fc..751fbee 100644
--- a/modules/accumulo/pom.xml
+++ b/modules/accumulo/pom.xml
@@ -17,7 +17,7 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.fluo</groupId>
-    <artifactId>fluo-recipes-parent</artifactId>
+    <artifactId>fluo-recipes</artifactId>
     <version>1.0.0-incubating-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index 4d6893d..720d5f0 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -17,7 +17,7 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.fluo</groupId>
-    <artifactId>fluo-recipes-parent</artifactId>
+    <artifactId>fluo-recipes</artifactId>
     <version>1.0.0-incubating-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java
index 3f0e5f2..f2833c6 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java
@@ -77,7 +77,7 @@ public class TableOptimizations {
    * initialization. After Fluo is initialized, the optimizations can be retrieved by calling
    * {@link #getConfiguredOptimizations(FluoConfiguration)}.
    * 
-   * @param application config, likely obtained from calling
+   * @param appConfig config, likely obtained from calling
    *        {@link FluoConfiguration#getAppConfiguration()}
    * @param key A unique identifier for the optimization
    * @param clazz The optimization factory type.

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
index 6dadb8c..8100b37 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
@@ -18,6 +18,7 @@ package org.apache.fluo.recipes.core.export;
 import java.util.Iterator;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.data.Bytes;
@@ -25,7 +26,6 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.recipes.core.impl.BucketUtil;
 import org.apache.fluo.recipes.core.types.StringEncoder;
 import org.apache.fluo.recipes.core.types.TypeLayer;
 import org.apache.fluo.recipes.core.types.TypedTransactionBase;
@@ -47,8 +47,18 @@ class ExportBucket {
   private final String qid;
   private final Bytes bucketRow;
 
+  static String genBucketId(int bucket, int maxBucket) {
+    Preconditions.checkArgument(bucket >= 0);
+    Preconditions.checkArgument(maxBucket > 0);
+
+    int bits = 32 - Integer.numberOfLeadingZeros(maxBucket);
+    int bucketLen = bits / 4 + (bits % 4 > 0 ? 1 : 0);
+
+    return Strings.padStart(Integer.toHexString(bucket), bucketLen, '0');
+  }
+
   static Bytes generateBucketRow(String qid, int bucket, int numBuckets) {
-    return Bytes.of(qid + ":" + BucketUtil.genBucketId(bucket, numBuckets));
+    return Bytes.of(qid + ":" + genBucketId(bucket, numBuckets));
   }
 
   ExportBucket(TransactionBase tx, String qid, int bucket, int numBuckets) {

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java
deleted file mode 100644
index be28878..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.impl;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-
-public class BucketUtil {
-  public static String genBucketId(int bucket, int maxBucket) {
-    Preconditions.checkArgument(bucket >= 0);
-    Preconditions.checkArgument(maxBucket > 0);
-
-    int bits = 32 - Integer.numberOfLeadingZeros(maxBucket);
-    int bucketLen = bits / 4 + (bits % 4 > 0 ? 1 : 0);
-
-    return Strings.padStart(Integer.toHexString(bucket), bucketLen, '0');
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
index d551096..715d330 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
@@ -48,7 +49,6 @@ import org.apache.fluo.recipes.core.common.TableOptimizations;
 import org.apache.fluo.recipes.core.common.TableOptimizations.TableOptimizationsFactory;
 import org.apache.fluo.recipes.core.common.RowRange;
 import org.apache.fluo.recipes.core.common.TransientRegistry;
-import org.apache.fluo.recipes.core.impl.BucketUtil;
 import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
 
 /**
@@ -290,7 +290,7 @@ public class CollisionFreeMap<K, V> {
     byte[] k = serializer.serialize(key);
 
     int hash = Hashing.murmur3_32().hashBytes(k).asInt();
-    String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+    String bucketId = genBucketId(Math.abs(hash % numBuckets), numBuckets);
 
 
     BytesBuilder rowBuilder = Bytes.builder();
@@ -352,7 +352,7 @@ public class CollisionFreeMap<K, V> {
     for (Entry<K, V> entry : updates.entrySet()) {
       byte[] k = serializer.serialize(entry.getKey());
       int hash = Hashing.murmur3_32().hashBytes(k).asInt();
-      String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+      String bucketId = genBucketId(Math.abs(hash % numBuckets), numBuckets);
 
       // reset to the common row prefix
       rowBuilder.setLength(prefixLength);
@@ -377,6 +377,16 @@ public class CollisionFreeMap<K, V> {
     }
   }
 
+  static String genBucketId(int bucket, int maxBucket) {
+    Preconditions.checkArgument(bucket >= 0);
+    Preconditions.checkArgument(maxBucket > 0);
+
+    int bits = 32 - Integer.numberOfLeadingZeros(maxBucket);
+    int bucketLen = bits / 4 + (bits % 4 > 0 ? 1 : 0);
+
+    return Strings.padStart(Integer.toHexString(bucket), bucketLen, '0');
+  }
+
   public static <K2, V2> CollisionFreeMap<K2, V2> getInstance(String mapId,
       SimpleConfiguration appConf) {
     Options opts = new Options(mapId, appConf);
@@ -424,7 +434,7 @@ public class CollisionFreeMap<K, V> {
     public RowColumnValue convert(K2 key, V2 val) {
       byte[] k = serializer.serialize(key);
       int hash = Hashing.murmur3_32().hashBytes(k).asInt();
-      String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+      String bucketId = genBucketId(Math.abs(hash % numBuckets), numBuckets);
 
       BytesBuilder bb = Bytes.builder();
       Bytes row = bb.append(mapId).append(":d:").append(bucketId).append(":").append(k).toBytes();
@@ -605,7 +615,7 @@ public class CollisionFreeMap<K, V> {
 
       List<Bytes> dataSplits = new ArrayList<>();
       for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
-        String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
+        String bucketId = genBucketId(i, opts.numBuckets);
         rowBuilder.setLength(mapId.length());
         dataSplits.add(rowBuilder.append(":d:").append(bucketId).toBytes());
       }
@@ -613,7 +623,7 @@ public class CollisionFreeMap<K, V> {
 
       List<Bytes> updateSplits = new ArrayList<>();
       for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
-        String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
+        String bucketId = genBucketId(i, opts.numBuckets);
         rowBuilder.setLength(mapId.length());
         updateSplits.add(rowBuilder.append(":u:").append(bucketId).toBytes());
       }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java
deleted file mode 100644
index f7af791..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.export;
-
-import org.apache.fluo.recipes.core.types.TypedLoader;
-import org.apache.fluo.recipes.core.types.TypedTransactionBase;
-
-public class DocumentLoader extends TypedLoader {
-
-  String docid;
-  String refs[];
-
-  DocumentLoader(String docid, String... refs) {
-    this.docid = docid;
-    this.refs = refs;
-  }
-
-  @Override
-  public void load(TypedTransactionBase tx, Context context) throws Exception {
-    tx.mutate().row("d:" + docid).fam("content").qual("new").set(String.join(" ", refs));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java
deleted file mode 100644
index c4c11d8..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.export;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.core.export.ExportTestBase.RefExporter;
-import org.apache.fluo.recipes.core.types.TypedObserver;
-import org.apache.fluo.recipes.core.types.TypedTransactionBase;
-
-public class DocumentObserver extends TypedObserver {
-
-  ExportQueue<String, RefUpdates> refExportQueue;
-
-  @Override
-  public void init(Context context) throws Exception {
-    refExportQueue = ExportQueue.getInstance(RefExporter.QUEUE_ID, context.getAppConfiguration());
-  }
-
-  @Override
-  public ObservedColumn getObservedColumn() {
-    return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG);
-  }
-
-  @Override
-  public void process(TypedTransactionBase tx, Bytes row, Column col) {
-    String newContent = tx.get().row(row).col(col).toString();
-    Set<String> newRefs = new HashSet<>(Arrays.asList(newContent.split(" ")));
-    Set<String> currentRefs =
-        new HashSet<>(Arrays.asList(tx.get().row(row).fam("content").qual("current").toString("")
-            .split(" ")));
-
-    Set<String> addedRefs = new HashSet<>(newRefs);
-    addedRefs.removeAll(currentRefs);
-
-    Set<String> deletedRefs = new HashSet<>(currentRefs);
-    deletedRefs.removeAll(newRefs);
-
-    String key = row.toString().substring(2);
-    RefUpdates val = new RefUpdates(addedRefs, deletedRefs);
-
-    refExportQueue.add(tx, key, val);
-
-    tx.mutate().row(row).fam("content").qual("current").set(newContent);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java
deleted file mode 100644
index d54982b..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.export;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Transaction;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ExportBufferIT extends ExportTestBase {
-
-  @Override
-  protected int getNumBuckets() {
-    return 2;
-  }
-
-  @Override
-  protected Integer getBufferSize() {
-    return 1024;
-  }
-
-  @Test
-  public void testSmallExportBuffer() {
-    // try setting the export buffer size small. Make sure everything is exported.
-
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      ExportQueue<String, RefUpdates> refExportQueue =
-          ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
-      try (Transaction tx = fc.newTransaction()) {
-        for (int i = 0; i < 1000; i++) {
-          refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 10, i + 20), ns(new int[0])));
-        }
-
-        tx.commit();
-      }
-    }
-
-    miniFluo.waitForObservers();
-
-    Map<String, Set<String>> erefs = getExportedReferees();
-    Map<String, Set<String>> expected = new HashMap<>();
-
-    for (int i = 0; i < 1000; i++) {
-      expected.computeIfAbsent(nk(i + 10), s -> new HashSet<>()).add(nk(i));
-      expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
-    }
-
-    assertEquals(expected, erefs);
-    int prevNumExportCalls = getNumExportCalls();
-    Assert.assertTrue(prevNumExportCalls > 10); // with small buffer there should be lots of exports
-                                                // calls
-
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      ExportQueue<String, RefUpdates> refExportQueue =
-          ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
-      try (Transaction tx = fc.newTransaction()) {
-        for (int i = 0; i < 1000; i++) {
-          refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 12), ns(i + 10)));
-        }
-
-        tx.commit();
-      }
-    }
-
-    miniFluo.waitForObservers();
-
-    erefs = getExportedReferees();
-    expected = new HashMap<>();
-
-    for (int i = 0; i < 1000; i++) {
-      expected.computeIfAbsent(nk(i + 12), s -> new HashSet<>()).add(nk(i));
-      expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
-    }
-
-    assertEquals(expected, erefs);
-    prevNumExportCalls = getNumExportCalls() - prevNumExportCalls;
-    Assert.assertTrue(prevNumExportCalls > 10);
-  }
-
-  public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual) {
-    if (!expected.equals(actual)) {
-      System.out.println("*** diff ***");
-      diff(expected, actual);
-      Assert.fail();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java
deleted file mode 100644
index b4e167c..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.export;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.LoaderExecutor;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ExportQueueIT extends ExportTestBase {
-
-  @Test
-  public void testExport() {
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0999", "0005", "0002"));
-        loader.execute(new DocumentLoader("0002", "0999", "0042"));
-        loader.execute(new DocumentLoader("0005", "0999", "0042"));
-        loader.execute(new DocumentLoader("0042", "0999"));
-      }
-
-      miniFluo.waitForObservers();
-
-      Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
-      Assert.assertEquals(ns("0999"), getExportedReferees("0002"));
-      Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
-      Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0999", "0005", "0042"));
-      }
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0999", "0005"));
-      }
-
-      miniFluo.waitForObservers();
-
-      Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
-      Assert.assertEquals(ns(new String[0]), getExportedReferees("0002"));
-      Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
-      Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0042", "0999", "0002", "0005"));
-        loader.execute(new DocumentLoader("0005", "0002"));
-      }
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0005", "0003"));
-      }
-
-      miniFluo.waitForObservers();
-
-      Assert.assertEquals(ns("0002", "0042"), getExportedReferees("0999"));
-      Assert.assertEquals(ns("0042"), getExportedReferees("0002"));
-      Assert.assertEquals(ns("0005"), getExportedReferees("0003"));
-      Assert.assertEquals(ns("0999", "0042"), getExportedReferees("0005"));
-      Assert.assertEquals(ns("0002"), getExportedReferees("0042"));
-
-    }
-  }
-
-  @Test
-  public void exportStressTest() {
-    FluoConfiguration config = new FluoConfiguration(miniFluo.getClientConfiguration());
-    config.setLoaderQueueSize(100);
-    config.setLoaderThreads(20);
-
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-
-      loadRandom(fc, 1000, 500);
-
-      miniFluo.waitForObservers();
-
-      diff(getFluoReferees(fc), getExportedReferees());
-
-      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-
-      loadRandom(fc, 1000, 500);
-
-      miniFluo.waitForObservers();
-
-      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-
-      loadRandom(fc, 1000, 10000);
-
-      miniFluo.waitForObservers();
-
-      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-
-      loadRandom(fc, 1000, 10000);
-
-      miniFluo.waitForObservers();
-
-      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
deleted file mode 100644
index fd3ece1..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.export;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import com.google.common.collect.Iterators;
-import org.apache.commons.io.FileUtils;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.LoaderExecutor;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.ColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-
-public class ExportTestBase {
-
-  private static Map<String, Map<String, RefInfo>> globalExports = new HashMap<>();
-  private static int exportCalls = 0;
-
-  protected static Set<String> getExportedReferees(String node) {
-    synchronized (globalExports) {
-      Set<String> ret = new HashSet<>();
-
-      Map<String, RefInfo> referees = globalExports.get(node);
-
-      if (referees == null) {
-        return ret;
-      }
-
-      referees.forEach((k, v) -> {
-        if (!v.deleted)
-          ret.add(k);
-      });
-
-      return ret;
-    }
-  }
-
-  protected static Map<String, Set<String>> getExportedReferees() {
-    synchronized (globalExports) {
-
-      Map<String, Set<String>> ret = new HashMap<>();
-
-      for (String k : globalExports.keySet()) {
-        Set<String> referees = getExportedReferees(k);
-        if (referees.size() > 0) {
-          ret.put(k, referees);
-        }
-      }
-
-      return ret;
-    }
-  }
-
-  protected static int getNumExportCalls() {
-    synchronized (globalExports) {
-      return exportCalls;
-    }
-  }
-
-  public static class RefExporter extends Exporter<String, RefUpdates> {
-
-    public static final String QUEUE_ID = "req";
-
-    private void updateExports(String key, long seq, String addedRef, boolean deleted) {
-      Map<String, RefInfo> referees = globalExports.computeIfAbsent(addedRef, k -> new HashMap<>());
-      referees.compute(key, (k, v) -> (v == null || v.seq < seq) ? new RefInfo(seq, deleted) : v);
-    }
-
-    @Override
-    protected void processExports(Iterator<SequencedExport<String, RefUpdates>> exportIterator) {
-      ArrayList<SequencedExport<String, RefUpdates>> exportList = new ArrayList<>();
-      Iterators.addAll(exportList, exportIterator);
-
-      synchronized (globalExports) {
-        exportCalls++;
-
-        for (SequencedExport<String, RefUpdates> se : exportList) {
-          for (String addedRef : se.getValue().getAddedRefs()) {
-            updateExports(se.getKey(), se.getSequence(), addedRef, false);
-          }
-
-          for (String deletedRef : se.getValue().getDeletedRefs()) {
-            updateExports(se.getKey(), se.getSequence(), deletedRef, true);
-          }
-        }
-      }
-    }
-  }
-
-  protected MiniFluo miniFluo;
-
-  protected int getNumBuckets() {
-    return 13;
-  }
-
-  protected Integer getBufferSize() {
-    return null;
-  }
-
-  @Before
-  public void setUpFluo() throws Exception {
-    FileUtils.deleteQuietly(new File("target/mini"));
-
-    FluoConfiguration props = new FluoConfiguration();
-    props.setApplicationName("eqt");
-    props.setWorkerThreads(20);
-    props.setMiniDataDir("target/mini");
-
-    ObserverSpecification doc = new ObserverSpecification(DocumentObserver.class.getName());
-    props.addObserver(doc);
-
-    SimpleSerializer.setSerializer(props, GsonSerializer.class);
-
-    ExportQueue.Options exportQueueOpts =
-        new ExportQueue.Options(RefExporter.QUEUE_ID, RefExporter.class, String.class,
-            RefUpdates.class, getNumBuckets());
-
-    if (getBufferSize() != null) {
-      exportQueueOpts.setBufferSize(getBufferSize());
-    }
-
-    ExportQueue.configure(props, exportQueueOpts);
-
-    miniFluo = FluoFactory.newMiniFluo(props);
-
-    globalExports.clear();
-    exportCalls = 0;
-  }
-
-  @After
-  public void tearDownFluo() throws Exception {
-    if (miniFluo != null) {
-      miniFluo.close();
-    }
-  }
-
-  protected static Set<String> ns(String... sa) {
-    return new HashSet<>(Arrays.asList(sa));
-  }
-
-  protected static String nk(int i) {
-    return String.format("%06d", i);
-  }
-
-  protected static Set<String> ns(int... ia) {
-    HashSet<String> ret = new HashSet<>();
-    for (int i : ia) {
-      ret.add(nk(i));
-    }
-    return ret;
-  }
-
-  public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual,
-      FluoClient fc) {
-    if (!expected.equals(actual)) {
-      System.out.println("*** diff ***");
-      diff(expected, actual);
-      System.out.println("*** fluo dump ***");
-      dump(fc);
-      System.out.println("*** map dump ***");
-
-      Assert.fail();
-    }
-  }
-
-  protected void loadRandom(FluoClient fc, int num, int maxDocId) {
-    try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-      Random rand = new Random();
-
-      for (int i = 0; i < num; i++) {
-        String docid = String.format("%05d", rand.nextInt(maxDocId));
-        String[] refs = new String[rand.nextInt(20) + 1];
-        for (int j = 0; j < refs.length; j++) {
-          refs[j] = String.format("%05d", rand.nextInt(maxDocId));
-        }
-
-        loader.execute(new DocumentLoader(docid, refs));
-      }
-    }
-  }
-
-  protected void diff(Map<String, Set<String>> fr, Map<String, Set<String>> er) {
-    HashSet<String> allKeys = new HashSet<>(fr.keySet());
-    allKeys.addAll(er.keySet());
-
-    for (String k : allKeys) {
-      Set<String> s1 = fr.getOrDefault(k, Collections.emptySet());
-      Set<String> s2 = er.getOrDefault(k, Collections.emptySet());
-
-      HashSet<String> sub1 = new HashSet<>(s1);
-      sub1.removeAll(s2);
-
-      HashSet<String> sub2 = new HashSet<>(s2);
-      sub2.removeAll(s1);
-
-      if (sub1.size() > 0 || sub2.size() > 0) {
-        System.out.println(k + " " + sub1 + " " + sub2);
-      }
-
-    }
-  }
-
-  protected Map<String, Set<String>> getFluoReferees(FluoClient fc) {
-    Map<String, Set<String>> fluoReferees = new HashMap<>();
-
-    try (Snapshot snap = fc.newSnapshot()) {
-
-      Column currCol = new Column("content", "current");
-      RowScanner rowScanner = snap.scanner().over(Span.prefix("d:")).fetch(currCol).byRow().build();
-
-      for (ColumnScanner columnScanner : rowScanner) {
-        String docid = columnScanner.getsRow().substring(2);
-
-        for (ColumnValue columnValue : columnScanner) {
-          String[] refs = columnValue.getsValue().split(" ");
-
-          for (String ref : refs) {
-            if (ref.isEmpty())
-              continue;
-
-            fluoReferees.computeIfAbsent(ref, k -> new HashSet<>()).add(docid);
-          }
-        }
-      }
-    }
-    return fluoReferees;
-  }
-
-  public static void dump(FluoClient fc) {
-    try (Snapshot snap = fc.newSnapshot()) {
-      CellScanner scanner = snap.scanner().build();
-      scanner.forEach(rcv -> System.out.println("row:[" + rcv.getRow() + "]  col:["
-          + rcv.getColumn() + "]  val:[" + rcv.getValue() + "]"));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java
deleted file mode 100644
index 2d45ff3..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.export;
-
-import java.nio.charset.StandardCharsets;
-
-import com.google.gson.Gson;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
-
-public class GsonSerializer implements SimpleSerializer {
-
-  private Gson gson = new Gson();
-
-  @Override
-  public void init(SimpleConfiguration appConfig) {
-
-  }
-
-  @Override
-  public <T> byte[] serialize(T obj) {
-    return gson.toJson(obj).getBytes(StandardCharsets.UTF_8);
-  }
-
-  @Override
-  public <T> T deserialize(byte[] serObj, Class<T> clazz) {
-    return gson.fromJson(new String(serObj, StandardCharsets.UTF_8), clazz);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
index 60982bd..376223b 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
@@ -17,6 +17,7 @@ package org.apache.fluo.recipes.core.export;
 
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.recipes.core.export.ExportQueue;
 import org.apache.fluo.recipes.core.export.ExportQueue.Options;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java
deleted file mode 100644
index f4d1c76..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.export;
-
-class RefInfo {
-  long seq;
-  boolean deleted;
-
-  public RefInfo(long seq, boolean deleted) {
-    this.seq = seq;
-    this.deleted = deleted;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java
deleted file mode 100644
index efa94dd..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.export;
-
-import java.util.Set;
-
-public class RefUpdates {
-  private Set<String> addedRefs;
-  private Set<String> deletedRefs;
-
-  public RefUpdates() {}
-
-  public RefUpdates(Set<String> addedRefs, Set<String> deletedRefs) {
-    this.addedRefs = addedRefs;
-    this.deletedRefs = deletedRefs;
-  }
-
-  public Set<String> getAddedRefs() {
-    return addedRefs;
-  }
-
-  public Set<String> getDeletedRefs() {
-    return deletedRefs;
-  }
-
-  @Override
-  public String toString() {
-    return "added:" + addedRefs + " deleted:" + deletedRefs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentLoader.java
new file mode 100644
index 0000000..6b8ca36
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export.it;
+
+import org.apache.fluo.recipes.core.types.TypedLoader;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentLoader extends TypedLoader {
+
+  String docid;
+  String refs[];
+
+  DocumentLoader(String docid, String... refs) {
+    this.docid = docid;
+    this.refs = refs;
+  }
+
+  @Override
+  public void load(TypedTransactionBase tx, Context context) throws Exception {
+    tx.mutate().row("d:" + docid).fam("content").qual("new").set(String.join(" ", refs));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentObserver.java
new file mode 100644
index 0000000..c9765a7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentObserver.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export.it;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.export.ExportQueue;
+import org.apache.fluo.recipes.core.export.it.ExportTestBase.RefExporter;
+import org.apache.fluo.recipes.core.types.TypedObserver;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentObserver extends TypedObserver {
+
+  ExportQueue<String, RefUpdates> refExportQueue;
+
+  @Override
+  public void init(Context context) throws Exception {
+    refExportQueue = ExportQueue.getInstance(RefExporter.QUEUE_ID, context.getAppConfiguration());
+  }
+
+  @Override
+  public ObservedColumn getObservedColumn() {
+    return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG);
+  }
+
+  @Override
+  public void process(TypedTransactionBase tx, Bytes row, Column col) {
+    String newContent = tx.get().row(row).col(col).toString();
+    Set<String> newRefs = new HashSet<>(Arrays.asList(newContent.split(" ")));
+    Set<String> currentRefs =
+        new HashSet<>(Arrays.asList(tx.get().row(row).fam("content").qual("current").toString("")
+            .split(" ")));
+
+    Set<String> addedRefs = new HashSet<>(newRefs);
+    addedRefs.removeAll(currentRefs);
+
+    Set<String> deletedRefs = new HashSet<>(currentRefs);
+    deletedRefs.removeAll(newRefs);
+
+    String key = row.toString().substring(2);
+    RefUpdates val = new RefUpdates(addedRefs, deletedRefs);
+
+    refExportQueue.add(tx, key, val);
+
+    tx.mutate().row(row).fam("content").qual("current").set(newContent);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportBufferIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportBufferIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportBufferIT.java
new file mode 100644
index 0000000..d937ac8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportBufferIT.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export.it;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.recipes.core.export.ExportQueue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExportBufferIT extends ExportTestBase {
+
+  @Override
+  protected int getNumBuckets() {
+    return 2;
+  }
+
+  @Override
+  protected Integer getBufferSize() {
+    return 1024;
+  }
+
+  @Test
+  public void testSmallExportBuffer() {
+    // try setting the export buffer size small. Make sure everything is exported.
+
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      ExportQueue<String, RefUpdates> refExportQueue =
+          ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
+      try (Transaction tx = fc.newTransaction()) {
+        for (int i = 0; i < 1000; i++) {
+          refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 10, i + 20), ns(new int[0])));
+        }
+
+        tx.commit();
+      }
+    }
+
+    miniFluo.waitForObservers();
+
+    Map<String, Set<String>> erefs = getExportedReferees();
+    Map<String, Set<String>> expected = new HashMap<>();
+
+    for (int i = 0; i < 1000; i++) {
+      expected.computeIfAbsent(nk(i + 10), s -> new HashSet<>()).add(nk(i));
+      expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
+    }
+
+    assertEquals(expected, erefs);
+    int prevNumExportCalls = getNumExportCalls();
+    Assert.assertTrue(prevNumExportCalls > 10); // with small buffer there should be lots of exports
+                                                // calls
+
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      ExportQueue<String, RefUpdates> refExportQueue =
+          ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
+      try (Transaction tx = fc.newTransaction()) {
+        for (int i = 0; i < 1000; i++) {
+          refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 12), ns(i + 10)));
+        }
+
+        tx.commit();
+      }
+    }
+
+    miniFluo.waitForObservers();
+
+    erefs = getExportedReferees();
+    expected = new HashMap<>();
+
+    for (int i = 0; i < 1000; i++) {
+      expected.computeIfAbsent(nk(i + 12), s -> new HashSet<>()).add(nk(i));
+      expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
+    }
+
+    assertEquals(expected, erefs);
+    prevNumExportCalls = getNumExportCalls() - prevNumExportCalls;
+    Assert.assertTrue(prevNumExportCalls > 10);
+  }
+
+  public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual) {
+    if (!expected.equals(actual)) {
+      System.out.println("*** diff ***");
+      diff(expected, actual);
+      Assert.fail();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportQueueIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportQueueIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportQueueIT.java
new file mode 100644
index 0000000..55c1a27
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportQueueIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export.it;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExportQueueIT extends ExportTestBase {
+
+  @Test
+  public void testExport() {
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0999", "0005", "0002"));
+        loader.execute(new DocumentLoader("0002", "0999", "0042"));
+        loader.execute(new DocumentLoader("0005", "0999", "0042"));
+        loader.execute(new DocumentLoader("0042", "0999"));
+      }
+
+      miniFluo.waitForObservers();
+
+      Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
+      Assert.assertEquals(ns("0999"), getExportedReferees("0002"));
+      Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
+      Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0999", "0005", "0042"));
+      }
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0999", "0005"));
+      }
+
+      miniFluo.waitForObservers();
+
+      Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
+      Assert.assertEquals(ns(new String[0]), getExportedReferees("0002"));
+      Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
+      Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0042", "0999", "0002", "0005"));
+        loader.execute(new DocumentLoader("0005", "0002"));
+      }
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0005", "0003"));
+      }
+
+      miniFluo.waitForObservers();
+
+      Assert.assertEquals(ns("0002", "0042"), getExportedReferees("0999"));
+      Assert.assertEquals(ns("0042"), getExportedReferees("0002"));
+      Assert.assertEquals(ns("0005"), getExportedReferees("0003"));
+      Assert.assertEquals(ns("0999", "0042"), getExportedReferees("0005"));
+      Assert.assertEquals(ns("0002"), getExportedReferees("0042"));
+
+    }
+  }
+
+  @Test
+  public void exportStressTest() {
+    FluoConfiguration config = new FluoConfiguration(miniFluo.getClientConfiguration());
+    config.setLoaderQueueSize(100);
+    config.setLoaderThreads(20);
+
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+
+      loadRandom(fc, 1000, 500);
+
+      miniFluo.waitForObservers();
+
+      diff(getFluoReferees(fc), getExportedReferees());
+
+      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+
+      loadRandom(fc, 1000, 500);
+
+      miniFluo.waitForObservers();
+
+      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+
+      loadRandom(fc, 1000, 10000);
+
+      miniFluo.waitForObservers();
+
+      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+
+      loadRandom(fc, 1000, 10000);
+
+      miniFluo.waitForObservers();
+
+      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
new file mode 100644
index 0000000..3c210a9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export.it;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.recipes.core.export.ExportQueue;
+import org.apache.fluo.recipes.core.export.Exporter;
+import org.apache.fluo.recipes.core.export.SequencedExport;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+public class ExportTestBase {
+
+  private static Map<String, Map<String, RefInfo>> globalExports = new HashMap<>();
+  private static int exportCalls = 0;
+
+  protected static Set<String> getExportedReferees(String node) {
+    synchronized (globalExports) {
+      Set<String> ret = new HashSet<>();
+
+      Map<String, RefInfo> referees = globalExports.get(node);
+
+      if (referees == null) {
+        return ret;
+      }
+
+      referees.forEach((k, v) -> {
+        if (!v.deleted)
+          ret.add(k);
+      });
+
+      return ret;
+    }
+  }
+
+  protected static Map<String, Set<String>> getExportedReferees() {
+    synchronized (globalExports) {
+
+      Map<String, Set<String>> ret = new HashMap<>();
+
+      for (String k : globalExports.keySet()) {
+        Set<String> referees = getExportedReferees(k);
+        if (referees.size() > 0) {
+          ret.put(k, referees);
+        }
+      }
+
+      return ret;
+    }
+  }
+
+  protected static int getNumExportCalls() {
+    synchronized (globalExports) {
+      return exportCalls;
+    }
+  }
+
+  public static class RefExporter extends Exporter<String, RefUpdates> {
+
+    public static final String QUEUE_ID = "req";
+
+    private void updateExports(String key, long seq, String addedRef, boolean deleted) {
+      Map<String, RefInfo> referees = globalExports.computeIfAbsent(addedRef, k -> new HashMap<>());
+      referees.compute(key, (k, v) -> (v == null || v.seq < seq) ? new RefInfo(seq, deleted) : v);
+    }
+
+    @Override
+    protected void processExports(Iterator<SequencedExport<String, RefUpdates>> exportIterator) {
+      ArrayList<SequencedExport<String, RefUpdates>> exportList = new ArrayList<>();
+      Iterators.addAll(exportList, exportIterator);
+
+      synchronized (globalExports) {
+        exportCalls++;
+
+        for (SequencedExport<String, RefUpdates> se : exportList) {
+          for (String addedRef : se.getValue().getAddedRefs()) {
+            updateExports(se.getKey(), se.getSequence(), addedRef, false);
+          }
+
+          for (String deletedRef : se.getValue().getDeletedRefs()) {
+            updateExports(se.getKey(), se.getSequence(), deletedRef, true);
+          }
+        }
+      }
+    }
+  }
+
+  protected MiniFluo miniFluo;
+
+  protected int getNumBuckets() {
+    return 13;
+  }
+
+  protected Integer getBufferSize() {
+    return null;
+  }
+
+  @Before
+  public void setUpFluo() throws Exception {
+    FileUtils.deleteQuietly(new File("target/mini"));
+
+    FluoConfiguration props = new FluoConfiguration();
+    props.setApplicationName("eqt");
+    props.setWorkerThreads(20);
+    props.setMiniDataDir("target/mini");
+
+    ObserverSpecification doc = new ObserverSpecification(DocumentObserver.class.getName());
+    props.addObserver(doc);
+
+    SimpleSerializer.setSerializer(props, GsonSerializer.class);
+
+    ExportQueue.Options exportQueueOpts =
+        new ExportQueue.Options(RefExporter.QUEUE_ID, RefExporter.class, String.class,
+            RefUpdates.class, getNumBuckets());
+
+    if (getBufferSize() != null) {
+      exportQueueOpts.setBufferSize(getBufferSize());
+    }
+
+    ExportQueue.configure(props, exportQueueOpts);
+
+    miniFluo = FluoFactory.newMiniFluo(props);
+
+    globalExports.clear();
+    exportCalls = 0;
+  }
+
+  @After
+  public void tearDownFluo() throws Exception {
+    if (miniFluo != null) {
+      miniFluo.close();
+    }
+  }
+
+  protected static Set<String> ns(String... sa) {
+    return new HashSet<>(Arrays.asList(sa));
+  }
+
+  protected static String nk(int i) {
+    return String.format("%06d", i);
+  }
+
+  protected static Set<String> ns(int... ia) {
+    HashSet<String> ret = new HashSet<>();
+    for (int i : ia) {
+      ret.add(nk(i));
+    }
+    return ret;
+  }
+
+  public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual,
+      FluoClient fc) {
+    if (!expected.equals(actual)) {
+      System.out.println("*** diff ***");
+      diff(expected, actual);
+      System.out.println("*** fluo dump ***");
+      dump(fc);
+      System.out.println("*** map dump ***");
+
+      Assert.fail();
+    }
+  }
+
+  protected void loadRandom(FluoClient fc, int num, int maxDocId) {
+    try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+      Random rand = new Random();
+
+      for (int i = 0; i < num; i++) {
+        String docid = String.format("%05d", rand.nextInt(maxDocId));
+        String[] refs = new String[rand.nextInt(20) + 1];
+        for (int j = 0; j < refs.length; j++) {
+          refs[j] = String.format("%05d", rand.nextInt(maxDocId));
+        }
+
+        loader.execute(new DocumentLoader(docid, refs));
+      }
+    }
+  }
+
+  protected void diff(Map<String, Set<String>> fr, Map<String, Set<String>> er) {
+    HashSet<String> allKeys = new HashSet<>(fr.keySet());
+    allKeys.addAll(er.keySet());
+
+    for (String k : allKeys) {
+      Set<String> s1 = fr.getOrDefault(k, Collections.emptySet());
+      Set<String> s2 = er.getOrDefault(k, Collections.emptySet());
+
+      HashSet<String> sub1 = new HashSet<>(s1);
+      sub1.removeAll(s2);
+
+      HashSet<String> sub2 = new HashSet<>(s2);
+      sub2.removeAll(s1);
+
+      if (sub1.size() > 0 || sub2.size() > 0) {
+        System.out.println(k + " " + sub1 + " " + sub2);
+      }
+
+    }
+  }
+
+  protected Map<String, Set<String>> getFluoReferees(FluoClient fc) {
+    Map<String, Set<String>> fluoReferees = new HashMap<>();
+
+    try (Snapshot snap = fc.newSnapshot()) {
+
+      Column currCol = new Column("content", "current");
+      RowScanner rowScanner = snap.scanner().over(Span.prefix("d:")).fetch(currCol).byRow().build();
+
+      for (ColumnScanner columnScanner : rowScanner) {
+        String docid = columnScanner.getsRow().substring(2);
+
+        for (ColumnValue columnValue : columnScanner) {
+          String[] refs = columnValue.getsValue().split(" ");
+
+          for (String ref : refs) {
+            if (ref.isEmpty())
+              continue;
+
+            fluoReferees.computeIfAbsent(ref, k -> new HashSet<>()).add(docid);
+          }
+        }
+      }
+    }
+    return fluoReferees;
+  }
+
+  public static void dump(FluoClient fc) {
+    try (Snapshot snap = fc.newSnapshot()) {
+      CellScanner scanner = snap.scanner().build();
+      scanner.forEach(rcv -> System.out.println("row:[" + rcv.getRow() + "]  col:["
+          + rcv.getColumn() + "]  val:[" + rcv.getValue() + "]"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/GsonSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/GsonSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/GsonSerializer.java
new file mode 100644
index 0000000..aa610f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/GsonSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export.it;
+
+import java.nio.charset.StandardCharsets;
+
+import com.google.gson.Gson;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+public class GsonSerializer implements SimpleSerializer {
+
+  private Gson gson = new Gson();
+
+  @Override
+  public void init(SimpleConfiguration appConfig) {
+
+  }
+
+  @Override
+  public <T> byte[] serialize(T obj) {
+    return gson.toJson(obj).getBytes(StandardCharsets.UTF_8);
+  }
+
+  @Override
+  public <T> T deserialize(byte[] serObj, Class<T> clazz) {
+    return gson.fromJson(new String(serObj, StandardCharsets.UTF_8), clazz);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefInfo.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefInfo.java
new file mode 100644
index 0000000..e08104b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefInfo.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export.it;
+
+class RefInfo {
+  long seq;
+  boolean deleted;
+
+  public RefInfo(long seq, boolean deleted) {
+    this.seq = seq;
+    this.deleted = deleted;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefUpdates.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefUpdates.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefUpdates.java
new file mode 100644
index 0000000..cd68049
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefUpdates.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export.it;
+
+import java.util.Set;
+
+public class RefUpdates {
+  private Set<String> addedRefs;
+  private Set<String> deletedRefs;
+
+  public RefUpdates() {}
+
+  public RefUpdates(Set<String> addedRefs, Set<String> deletedRefs) {
+    this.addedRefs = addedRefs;
+    this.deletedRefs = deletedRefs;
+  }
+
+  public Set<String> getAddedRefs() {
+    return addedRefs;
+  }
+
+  public Set<String> getDeletedRefs() {
+    return deletedRefs;
+  }
+
+  @Override
+  public String toString() {
+    return "added:" + addedRefs + " deleted:" + deletedRefs;
+  }
+}


Mime
View raw message