fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/2] incubator-fluo-recipes git commit: Misc release prep cleanup
Date Mon, 17 Oct 2016 14:56:27 GMT
Repository: incubator-fluo-recipes
Updated Branches:
  refs/heads/master 573aeb788 -> 10ccb16ae


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
deleted file mode 100644
index 3f3a84c..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.map;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import org.apache.commons.io.FileUtils;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.ColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
-import org.apache.fluo.recipes.core.types.StringEncoder;
-import org.apache.fluo.recipes.core.types.TypeLayer;
-import org.apache.fluo.recipes.core.types.TypedSnapshot;
-import org.apache.fluo.recipes.core.types.TypedTransactionBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * This test configures a small buffer size and verifies that multiple passes are made to process
- * updates.
- */
-public class BigUpdateIT {
-  private static final TypeLayer tl = new TypeLayer(new StringEncoder());
-
-  private MiniFluo miniFluo;
-
-  private CollisionFreeMap<String, Long> wcMap;
-
-  static final String MAP_ID = "bu";
-
-  public static class LongCombiner implements Combiner<String, Long> {
-
-    @Override
-    public Optional<Long> combine(String key, Iterator<Long> updates) {
-      long[] count = new long[] {0};
-      updates.forEachRemaining(l -> count[0] += l);
-      return Optional.of(count[0]);
-    }
-  }
-
-  static final Column DSCOL = new Column("debug", "sum");
-
-  private static AtomicInteger globalUpdates = new AtomicInteger(0);
-
-  public static class MyObserver extends UpdateObserver<String, Long> {
-
-    @Override
-    public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
-      TypedTransactionBase ttx = tl.wrap(tx);
-
-      Map<String, Long> expectedOld = new HashMap<>();
-
-
-      while (updates.hasNext()) {
-        Update<String, Long> update = updates.next();
-
-        if (update.getOldValue().isPresent()) {
-          expectedOld.put("side:" + update.getKey(), update.getOldValue().get());
-        }
-
-        ttx.mutate().row("side:" + update.getKey()).col(DSCOL).set(update.getNewValue().get());
-      }
-
-      // get last values set to verify same as passed in old value
-      Map<String, Long> actualOld =
-          Maps.transformValues(
-              ttx.get().rowsString(expectedOld.keySet()).columns(ImmutableSet.of(DSCOL))
-                  .toStringMap(), m -> m.get(DSCOL).toLong());
-
-      MapDifference<String, Long> diff = Maps.difference(expectedOld, actualOld);
-
-      Assert.assertTrue(diff.toString(), diff.areEqual());
-
-      globalUpdates.incrementAndGet();
-    }
-  }
-
-  @Before
-  public void setUpFluo() throws Exception {
-    FileUtils.deleteQuietly(new File("target/mini"));
-
-    FluoConfiguration props = new FluoConfiguration();
-    props.setApplicationName("eqt");
-    props.setWorkerThreads(20);
-    props.setMiniDataDir("target/mini");
-
-    SimpleSerializer.setSerializer(props, TestSerializer.class);
-
-    CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, LongCombiner.class,
-        MyObserver.class, String.class, Long.class, 2).setBufferSize(1 << 10));
-
-    miniFluo = FluoFactory.newMiniFluo(props);
-
-    wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration());
-
-    globalUpdates.set(0);
-  }
-
-  @After
-  public void tearDownFluo() throws Exception {
-    if (miniFluo != null) {
-      miniFluo.close();
-    }
-  }
-
-  @Test
-  public void testBigUpdates() {
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      updateMany(fc);
-
-      miniFluo.waitForObservers();
-
-      int numUpdates = 0;
-
-      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
-        checkUpdates(snap, 1, 1000);
-        numUpdates = globalUpdates.get();
-        // there are two buckets, expect update processing at least twice per bucket
-        Assert.assertTrue(numUpdates >= 4);
-      }
-
-      updateMany(fc);
-      updateMany(fc);
-
-      miniFluo.waitForObservers();
-
-      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
-        checkUpdates(snap, 3, 1000);
-        numUpdates = globalUpdates.get() - numUpdates;
-        Assert.assertTrue(numUpdates >= 4);
-      }
-
-      for (int i = 0; i < 10; i++) {
-        updateMany(fc);
-      }
-
-      miniFluo.waitForObservers();
-
-      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
-        checkUpdates(snap, 13, 1000);
-        numUpdates = globalUpdates.get() - numUpdates;
-        Assert.assertTrue(numUpdates >= 4);
-      }
-    }
-  }
-
-  private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) {
-    RowScanner rows = snap.scanner().over(Span.prefix("side:")).byRow().build();
-
-    int row = 0;
-
-    for (ColumnScanner columns : rows) {
-      Assert.assertEquals(String.format("side:%06d", row++), columns.getsRow());
-
-      for (ColumnValue columnValue : columns) {
-        Assert.assertEquals(new Column("debug", "sum"), columnValue.getColumn());
-        Assert
-            .assertEquals("row : " + columns.getsRow(), "" + expectedVal, columnValue.getsValue());
-      }
-    }
-
-    Assert.assertEquals(expectedRows, row);
-  }
-
-  private void updateMany(FluoClient fc) {
-    try (Transaction tx = fc.newTransaction()) {
-      Map<String, Long> updates = new HashMap<>();
-      for (int i = 0; i < 1000; i++) {
-        updates.put(String.format("%06d", i), 1L);
-      }
-
-      wcMap.update(tx, updates);
-      tx.commit();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
deleted file mode 100644
index f5c832a..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.map;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.io.FileUtils;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.LoaderExecutor;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CollisionFreeMapIT {
-
-  private MiniFluo miniFluo;
-
-  private CollisionFreeMap<String, Long> wcMap;
-
-  static final String MAP_ID = "wcm";
-
-  @Before
-  public void setUpFluo() throws Exception {
-    FileUtils.deleteQuietly(new File("target/mini"));
-
-    FluoConfiguration props = new FluoConfiguration();
-    props.setApplicationName("eqt");
-    props.setWorkerThreads(20);
-    props.setMiniDataDir("target/mini");
-
-    props.addObserver(new ObserverSpecification(DocumentObserver.class.getName()));
-
-    SimpleSerializer.setSerializer(props, TestSerializer.class);
-
-    CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, WordCountCombiner.class,
-        WordCountObserver.class, String.class, Long.class, 17));
-
-    miniFluo = FluoFactory.newMiniFluo(props);
-
-    wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration());
-  }
-
-  @After
-  public void tearDownFluo() throws Exception {
-    if (miniFluo != null) {
-      miniFluo.close();
-    }
-  }
-
-  private Map<String, Long> getComputedWordCounts(FluoClient fc) {
-    Map<String, Long> counts = new HashMap<>();
-
-    try (Snapshot snap = fc.newSnapshot()) {
-
-      CellScanner scanner = snap.scanner().over(Span.prefix("iwc:")).build();
-
-      for (RowColumnValue rcv : scanner) {
-        String[] tokens = rcv.getsRow().split(":");
-        String word = tokens[2];
-        Long count = Long.valueOf(tokens[1]);
-
-        Assert.assertFalse("Word seen twice in index " + word, counts.containsKey(word));
-
-        counts.put(word, count);
-      }
-    }
-
-    return counts;
-  }
-
-  private Map<String, Long> computeWordCounts(FluoClient fc) {
-    Map<String, Long> counts = new HashMap<>();
-
-    try (Snapshot snap = fc.newSnapshot()) {
-
-
-      CellScanner scanner =
-          snap.scanner().over(Span.prefix("d:")).fetch(new Column("content", "current")).build();
-
-      for (RowColumnValue rcv : scanner) {
-        String[] words = rcv.getsValue().split("\\s+");
-        for (String word : words) {
-          if (word.isEmpty()) {
-            continue;
-          }
-
-          counts.merge(word, 1L, Long::sum);
-        }
-      }
-    }
-
-    return counts;
-  }
-
-  @Test
-  public void testGet() {
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      try (Transaction tx = fc.newTransaction()) {
-        wcMap.update(tx, ImmutableMap.of("cat", 2L, "dog", 5L));
-        tx.commit();
-      }
-
-      try (Transaction tx = fc.newTransaction()) {
-        wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L));
-        tx.commit();
-      }
-
-      try (Transaction tx = fc.newTransaction()) {
-        wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L, "fish", 2L));
-        tx.commit();
-      }
-
-      // try reading possibly before observer combines... will either see outstanding updates or a
-      // current value
-      try (Snapshot snap = fc.newSnapshot()) {
-        Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
-        Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog"));
-        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
-      }
-
-      miniFluo.waitForObservers();
-
-      // in this case there should be no updates, only a current value
-      try (Snapshot snap = fc.newSnapshot()) {
-        Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
-        Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog"));
-        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
-      }
-
-      Map<String, Long> expectedCounts = new HashMap<>();
-      expectedCounts.put("cat", 4L);
-      expectedCounts.put("dog", 7L);
-      expectedCounts.put("fish", 2L);
-
-      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
-
-      try (Transaction tx = fc.newTransaction()) {
-        wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", -7L));
-        tx.commit();
-      }
-
-      // there may be outstanding update and a current value for the key in this case
-      try (Snapshot snap = fc.newSnapshot()) {
-        Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat"));
-        Assert.assertNull(wcMap.get(snap, "dog"));
-        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
-      }
-
-      miniFluo.waitForObservers();
-
-      try (Snapshot snap = fc.newSnapshot()) {
-        Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat"));
-        Assert.assertNull(wcMap.get(snap, "dog"));
-        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
-      }
-
-      expectedCounts.put("cat", 5L);
-      expectedCounts.remove("dog");
-
-      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
-    }
-  }
-
-  @Test
-  public void testBasic() {
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0001", "dog cat"));
-        loader.execute(new DocumentLoader("0002", "cat hamster"));
-        loader.execute(new DocumentLoader("0003", "milk bread cat food"));
-        loader.execute(new DocumentLoader("0004", "zoo big cat"));
-      }
-
-      miniFluo.waitForObservers();
-
-      try (Snapshot snap = fc.newSnapshot()) {
-        Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
-        Assert.assertEquals((Long) 1L, wcMap.get(snap, "milk"));
-      }
-
-      Map<String, Long> expectedCounts = new HashMap<>();
-      expectedCounts.put("dog", 1L);
-      expectedCounts.put("cat", 4L);
-      expectedCounts.put("hamster", 1L);
-      expectedCounts.put("milk", 1L);
-      expectedCounts.put("bread", 1L);
-      expectedCounts.put("food", 1L);
-      expectedCounts.put("zoo", 1L);
-      expectedCounts.put("big", 1L);
-
-      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0001", "dog feline"));
-      }
-
-      miniFluo.waitForObservers();
-
-      expectedCounts.put("cat", 3L);
-      expectedCounts.put("feline", 1L);
-
-      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        // swap contents of two documents... should not change doc counts
-        loader.execute(new DocumentLoader("0003", "zoo big cat"));
-        loader.execute(new DocumentLoader("0004", "milk bread cat food"));
-      }
-
-      miniFluo.waitForObservers();
-      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0003", "zoo big cat"));
-        loader.execute(new DocumentLoader("0004", "zoo big cat"));
-      }
-
-      miniFluo.waitForObservers();
-
-      expectedCounts.put("zoo", 2L);
-      expectedCounts.put("big", 2L);
-      expectedCounts.remove("milk");
-      expectedCounts.remove("bread");
-      expectedCounts.remove("food");
-
-      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0002", "cat cat hamster hamster"));
-      }
-
-      miniFluo.waitForObservers();
-
-      expectedCounts.put("cat", 4L);
-      expectedCounts.put("hamster", 2L);
-
-      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0002", "dog hamster"));
-      }
-
-      miniFluo.waitForObservers();
-
-      expectedCounts.put("cat", 2L);
-      expectedCounts.put("hamster", 1L);
-      expectedCounts.put("dog", 2L);
-
-      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
-    }
-  }
-
-  private static String randDocId(Random rand) {
-    return String.format("%04d", rand.nextInt(5000));
-  }
-
-  private static String randomDocument(Random rand) {
-    StringBuilder sb = new StringBuilder();
-
-    String sep = "";
-    for (int i = 2; i < rand.nextInt(18); i++) {
-      sb.append(sep);
-      sep = " ";
-      sb.append(String.format("%05d", rand.nextInt(50000)));
-    }
-
-    return sb.toString();
-  }
-
-  public void diff(Map<String, Long> m1, Map<String, Long> m2) {
-    for (String word : m1.keySet()) {
-      Long v1 = m1.get(word);
-      Long v2 = m2.get(word);
-
-      if (v2 == null || !v1.equals(v2)) {
-        System.out.println(word + " " + v1 + " != " + v2);
-      }
-    }
-
-    for (String word : m2.keySet()) {
-      Long v1 = m1.get(word);
-      Long v2 = m2.get(word);
-
-      if (v1 == null) {
-        System.out.println(word + " null != " + v2);
-      }
-    }
-  }
-
-  @Test
-  public void testStress() throws Exception {
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      Random rand = new Random();
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        for (int i = 0; i < 1000; i++) {
-          loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand)));
-        }
-      }
-
-      miniFluo.waitForObservers();
-      assertWordCountsEqual(fc);
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        for (int i = 0; i < 100; i++) {
-          loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand)));
-        }
-      }
-
-      miniFluo.waitForObservers();
-      assertWordCountsEqual(fc);
-    }
-  }
-
-  private void assertWordCountsEqual(FluoClient fc) {
-    Map<String, Long> expected = computeWordCounts(fc);
-    Map<String, Long> actual = getComputedWordCounts(fc);
-    if (!expected.equals(actual)) {
-      diff(expected, actual);
-      Assert.fail();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java
deleted file mode 100644
index 54f5ee1..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.map;
-
-import org.apache.fluo.recipes.core.types.TypedLoader;
-import org.apache.fluo.recipes.core.types.TypedTransactionBase;
-
-public class DocumentLoader extends TypedLoader {
-
-  String docid;
-  String doc;
-
-  DocumentLoader(String docid, String doc) {
-    this.docid = docid;
-    this.doc = doc;
-  }
-
-  @Override
-  public void load(TypedTransactionBase tx, Context context) throws Exception {
-    tx.mutate().row("d:" + docid).fam("content").qual("new").set(doc);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java
deleted file mode 100644
index 2c79f45..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.map;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.core.types.TypedObserver;
-import org.apache.fluo.recipes.core.types.TypedTransactionBase;
-
-public class DocumentObserver extends TypedObserver {
-
-  CollisionFreeMap<String, Long> wcm;
-
-  @Override
-  public void init(Context context) throws Exception {
-    wcm = CollisionFreeMap.getInstance(CollisionFreeMapIT.MAP_ID, context.getAppConfiguration());
-  }
-
-  @Override
-  public ObservedColumn getObservedColumn() {
-    return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG);
-  }
-
-  static Map<String, Long> getWordCounts(String doc) {
-    Map<String, Long> wordCounts = new HashMap<>();
-    String[] words = doc.split(" ");
-    for (String word : words) {
-      if (word.isEmpty()) {
-        continue;
-      }
-      wordCounts.merge(word, 1L, Long::sum);
-    }
-
-    return wordCounts;
-  }
-
-  @Override
-  public void process(TypedTransactionBase tx, Bytes row, Column col) {
-    String newContent = tx.get().row(row).col(col).toString();
-    String currentContent = tx.get().row(row).fam("content").qual("current").toString("");
-
-    Map<String, Long> newWordCounts = getWordCounts(newContent);
-    Map<String, Long> currentWordCounts = getWordCounts(currentContent);
-
-    Map<String, Long> changes = calculateChanges(newWordCounts, currentWordCounts);
-
-    wcm.update(tx, changes);
-
-    tx.mutate().row(row).fam("content").qual("current").set(newContent);
-  }
-
-  private static Map<String, Long> calculateChanges(Map<String, Long> newCounts,
-      Map<String, Long> currCounts) {
-    Map<String, Long> changes = new HashMap<>();
-
-    // guava Maps class
-    MapDifference<String, Long> diffs = Maps.difference(currCounts, newCounts);
-
-    // compute the diffs for words that changed
-    changes.putAll(Maps.transformValues(diffs.entriesDiffering(), vDiff -> vDiff.rightValue()
-        - vDiff.leftValue()));
-
-    // add all new words
-    changes.putAll(diffs.entriesOnlyOnRight());
-
-    // subtract all words no longer present
-    changes.putAll(Maps.transformValues(diffs.entriesOnlyOnLeft(), l -> l * -1));
-
-    return changes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
index 37a2443..30f93ff 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
@@ -25,6 +25,7 @@ import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.recipes.core.common.TableOptimizations;
 import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+import org.apache.fluo.recipes.core.map.it.WordCountCombiner;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java
deleted file mode 100644
index b59705a..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.map;
-
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
-
-public class TestSerializer implements SimpleSerializer {
-
-  @Override
-  public <T> byte[] serialize(T obj) {
-    return obj.toString().getBytes();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public <T> T deserialize(byte[] serObj, Class<T> clazz) {
-    if (clazz.equals(Long.class)) {
-      return (T) Long.valueOf(new String(serObj));
-    }
-
-    if (clazz.equals(String.class)) {
-      return (T) new String(serObj);
-    }
-
-    throw new IllegalArgumentException();
-  }
-
-  @Override
-  public void init(SimpleConfiguration appConfig) {}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java
deleted file mode 100644
index f757c10..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.map;
-
-import java.util.Iterator;
-import java.util.Optional;
-
-public class WordCountCombiner implements Combiner<String, Long> {
-  @Override
-  public Optional<Long> combine(String key, Iterator<Long> updates) {
-    long sum = 0;
-
-    while (updates.hasNext()) {
-      sum += updates.next();
-    }
-
-    if (sum == 0) {
-      return Optional.empty();
-    } else {
-      return Optional.of(sum);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java
deleted file mode 100644
index 221083c..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.map;
-
-import java.util.Iterator;
-import java.util.Optional;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-
-public class WordCountObserver extends UpdateObserver<String, Long> {
-
-  @Override
-  public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
-
-    while (updates.hasNext()) {
-      Update<String, Long> update = updates.next();
-
-      Optional<Long> oldVal = update.getOldValue();
-      Optional<Long> newVal = update.getNewValue();
-
-      if (oldVal.isPresent()) {
-        String oldRow = String.format("iwc:%09d:%s", oldVal.get(), update.getKey());
-        tx.delete(Bytes.of(oldRow), new Column(Bytes.EMPTY, Bytes.EMPTY));
-      }
-
-      if (newVal.isPresent()) {
-        String newRow = String.format("iwc:%09d:%s", newVal.get(), update.getKey());
-        tx.set(Bytes.of(newRow), new Column(Bytes.EMPTY, Bytes.EMPTY), Bytes.EMPTY);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java
new file mode 100644
index 0000000..4213cff
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map.it;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap;
+import org.apache.fluo.recipes.core.map.Combiner;
+import org.apache.fluo.recipes.core.map.Update;
+import org.apache.fluo.recipes.core.map.UpdateObserver;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+import org.apache.fluo.recipes.core.types.StringEncoder;
+import org.apache.fluo.recipes.core.types.TypeLayer;
+import org.apache.fluo.recipes.core.types.TypedSnapshot;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test configures a small buffer size and verifies that multiple passes are made to process
+ * updates.
+ */
+public class BigUpdateIT {
+  private static final TypeLayer tl = new TypeLayer(new StringEncoder());
+
+  private MiniFluo miniFluo;
+
+  private CollisionFreeMap<String, Long> wcMap;
+
+  static final String MAP_ID = "bu";
+
+  public static class LongCombiner implements Combiner<String, Long> {
+
+    @Override
+    public Optional<Long> combine(String key, Iterator<Long> updates) {
+      long[] count = new long[] {0};
+      updates.forEachRemaining(l -> count[0] += l);
+      return Optional.of(count[0]);
+    }
+  }
+
+  static final Column DSCOL = new Column("debug", "sum");
+
+  private static AtomicInteger globalUpdates = new AtomicInteger(0);
+
+  public static class MyObserver extends UpdateObserver<String, Long> {
+
+    @Override
+    public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
+      TypedTransactionBase ttx = tl.wrap(tx);
+
+      Map<String, Long> expectedOld = new HashMap<>();
+
+
+      while (updates.hasNext()) {
+        Update<String, Long> update = updates.next();
+
+        if (update.getOldValue().isPresent()) {
+          expectedOld.put("side:" + update.getKey(), update.getOldValue().get());
+        }
+
+        ttx.mutate().row("side:" + update.getKey()).col(DSCOL).set(update.getNewValue().get());
+      }
+
+      // get last values set to verify same as passed in old value
+      Map<String, Long> actualOld =
+          Maps.transformValues(
+              ttx.get().rowsString(expectedOld.keySet()).columns(ImmutableSet.of(DSCOL))
+                  .toStringMap(), m -> m.get(DSCOL).toLong());
+
+      MapDifference<String, Long> diff = Maps.difference(expectedOld, actualOld);
+
+      Assert.assertTrue(diff.toString(), diff.areEqual());
+
+      globalUpdates.incrementAndGet();
+    }
+  }
+
+  @Before
+  public void setUpFluo() throws Exception {
+    FileUtils.deleteQuietly(new File("target/mini"));
+
+    FluoConfiguration props = new FluoConfiguration();
+    props.setApplicationName("eqt");
+    props.setWorkerThreads(20);
+    props.setMiniDataDir("target/mini");
+
+    SimpleSerializer.setSerializer(props, TestSerializer.class);
+
+    CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, LongCombiner.class,
+        MyObserver.class, String.class, Long.class, 2).setBufferSize(1 << 10));
+
+    miniFluo = FluoFactory.newMiniFluo(props);
+
+    wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration());
+
+    globalUpdates.set(0);
+  }
+
+  @After
+  public void tearDownFluo() throws Exception {
+    if (miniFluo != null) {
+      miniFluo.close();
+    }
+  }
+
+  @Test
+  public void testBigUpdates() {
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      updateMany(fc);
+
+      miniFluo.waitForObservers();
+
+      int numUpdates = 0;
+
+      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
+        checkUpdates(snap, 1, 1000);
+        numUpdates = globalUpdates.get();
+        // there are two buckets, expect update processing at least twice per bucket
+        Assert.assertTrue(numUpdates >= 4);
+      }
+
+      updateMany(fc);
+      updateMany(fc);
+
+      miniFluo.waitForObservers();
+
+      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
+        checkUpdates(snap, 3, 1000);
+        numUpdates = globalUpdates.get() - numUpdates;
+        Assert.assertTrue(numUpdates >= 4);
+      }
+
+      for (int i = 0; i < 10; i++) {
+        updateMany(fc);
+      }
+
+      miniFluo.waitForObservers();
+
+      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
+        checkUpdates(snap, 13, 1000);
+        numUpdates = globalUpdates.get() - numUpdates;
+        Assert.assertTrue(numUpdates >= 4);
+      }
+    }
+  }
+
+  private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) {
+    RowScanner rows = snap.scanner().over(Span.prefix("side:")).byRow().build();
+
+    int row = 0;
+
+    for (ColumnScanner columns : rows) {
+      Assert.assertEquals(String.format("side:%06d", row++), columns.getsRow());
+
+      for (ColumnValue columnValue : columns) {
+        Assert.assertEquals(new Column("debug", "sum"), columnValue.getColumn());
+        Assert
+            .assertEquals("row : " + columns.getsRow(), "" + expectedVal, columnValue.getsValue());
+      }
+    }
+
+    Assert.assertEquals(expectedRows, row);
+  }
+
+  private void updateMany(FluoClient fc) {
+    try (Transaction tx = fc.newTransaction()) {
+      Map<String, Long> updates = new HashMap<>();
+      for (int i = 0; i < 1000; i++) {
+        updates.put(String.format("%06d", i), 1L);
+      }
+
+      wcMap.update(tx, updates);
+      tx.commit();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java
new file mode 100644
index 0000000..e22ddea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map.it;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CollisionFreeMapIT {
+
+  private MiniFluo miniFluo;
+
+  private CollisionFreeMap<String, Long> wcMap;
+
+  static final String MAP_ID = "wcm";
+
+  @Before
+  public void setUpFluo() throws Exception {
+    FileUtils.deleteQuietly(new File("target/mini"));
+
+    FluoConfiguration props = new FluoConfiguration();
+    props.setApplicationName("eqt");
+    props.setWorkerThreads(20);
+    props.setMiniDataDir("target/mini");
+
+    props.addObserver(new ObserverSpecification(DocumentObserver.class.getName()));
+
+    SimpleSerializer.setSerializer(props, TestSerializer.class);
+
+    CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, WordCountCombiner.class,
+        WordCountObserver.class, String.class, Long.class, 17));
+
+    miniFluo = FluoFactory.newMiniFluo(props);
+
+    wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration());
+  }
+
+  @After
+  public void tearDownFluo() throws Exception {
+    if (miniFluo != null) {
+      miniFluo.close();
+    }
+  }
+
+  private Map<String, Long> getComputedWordCounts(FluoClient fc) {
+    Map<String, Long> counts = new HashMap<>();
+
+    try (Snapshot snap = fc.newSnapshot()) {
+
+      CellScanner scanner = snap.scanner().over(Span.prefix("iwc:")).build();
+
+      for (RowColumnValue rcv : scanner) {
+        String[] tokens = rcv.getsRow().split(":");
+        String word = tokens[2];
+        Long count = Long.valueOf(tokens[1]);
+
+        Assert.assertFalse("Word seen twice in index " + word, counts.containsKey(word));
+
+        counts.put(word, count);
+      }
+    }
+
+    return counts;
+  }
+
+  private Map<String, Long> computeWordCounts(FluoClient fc) {
+    Map<String, Long> counts = new HashMap<>();
+
+    try (Snapshot snap = fc.newSnapshot()) {
+
+
+      CellScanner scanner =
+          snap.scanner().over(Span.prefix("d:")).fetch(new Column("content", "current")).build();
+
+      for (RowColumnValue rcv : scanner) {
+        String[] words = rcv.getsValue().split("\\s+");
+        for (String word : words) {
+          if (word.isEmpty()) {
+            continue;
+          }
+
+          counts.merge(word, 1L, Long::sum);
+        }
+      }
+    }
+
+    return counts;
+  }
+
+  @Test
+  public void testGet() {
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      try (Transaction tx = fc.newTransaction()) {
+        wcMap.update(tx, ImmutableMap.of("cat", 2L, "dog", 5L));
+        tx.commit();
+      }
+
+      try (Transaction tx = fc.newTransaction()) {
+        wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L));
+        tx.commit();
+      }
+
+      try (Transaction tx = fc.newTransaction()) {
+        wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L, "fish", 2L));
+        tx.commit();
+      }
+
+      // try reading possibly before observer combines... will either see outstanding updates or a
+      // current value
+      try (Snapshot snap = fc.newSnapshot()) {
+        Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
+        Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog"));
+        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+      }
+
+      miniFluo.waitForObservers();
+
+      // in this case there should be no updates, only a current value
+      try (Snapshot snap = fc.newSnapshot()) {
+        Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
+        Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog"));
+        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+      }
+
+      Map<String, Long> expectedCounts = new HashMap<>();
+      expectedCounts.put("cat", 4L);
+      expectedCounts.put("dog", 7L);
+      expectedCounts.put("fish", 2L);
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (Transaction tx = fc.newTransaction()) {
+        wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", -7L));
+        tx.commit();
+      }
+
+      // there may be outstanding update and a current value for the key in this case
+      try (Snapshot snap = fc.newSnapshot()) {
+        Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat"));
+        Assert.assertNull(wcMap.get(snap, "dog"));
+        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+      }
+
+      miniFluo.waitForObservers();
+
+      try (Snapshot snap = fc.newSnapshot()) {
+        Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat"));
+        Assert.assertNull(wcMap.get(snap, "dog"));
+        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+      }
+
+      expectedCounts.put("cat", 5L);
+      expectedCounts.remove("dog");
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+    }
+  }
+
+  @Test
+  public void testBasic() {
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0001", "dog cat"));
+        loader.execute(new DocumentLoader("0002", "cat hamster"));
+        loader.execute(new DocumentLoader("0003", "milk bread cat food"));
+        loader.execute(new DocumentLoader("0004", "zoo big cat"));
+      }
+
+      miniFluo.waitForObservers();
+
+      try (Snapshot snap = fc.newSnapshot()) {
+        Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
+        Assert.assertEquals((Long) 1L, wcMap.get(snap, "milk"));
+      }
+
+      Map<String, Long> expectedCounts = new HashMap<>();
+      expectedCounts.put("dog", 1L);
+      expectedCounts.put("cat", 4L);
+      expectedCounts.put("hamster", 1L);
+      expectedCounts.put("milk", 1L);
+      expectedCounts.put("bread", 1L);
+      expectedCounts.put("food", 1L);
+      expectedCounts.put("zoo", 1L);
+      expectedCounts.put("big", 1L);
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0001", "dog feline"));
+      }
+
+      miniFluo.waitForObservers();
+
+      expectedCounts.put("cat", 3L);
+      expectedCounts.put("feline", 1L);
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        // swap contents of two documents... should not change doc counts
+        loader.execute(new DocumentLoader("0003", "zoo big cat"));
+        loader.execute(new DocumentLoader("0004", "milk bread cat food"));
+      }
+
+      miniFluo.waitForObservers();
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0003", "zoo big cat"));
+        loader.execute(new DocumentLoader("0004", "zoo big cat"));
+      }
+
+      miniFluo.waitForObservers();
+
+      expectedCounts.put("zoo", 2L);
+      expectedCounts.put("big", 2L);
+      expectedCounts.remove("milk");
+      expectedCounts.remove("bread");
+      expectedCounts.remove("food");
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0002", "cat cat hamster hamster"));
+      }
+
+      miniFluo.waitForObservers();
+
+      expectedCounts.put("cat", 4L);
+      expectedCounts.put("hamster", 2L);
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0002", "dog hamster"));
+      }
+
+      miniFluo.waitForObservers();
+
+      expectedCounts.put("cat", 2L);
+      expectedCounts.put("hamster", 1L);
+      expectedCounts.put("dog", 2L);
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+    }
+  }
+
+  private static String randDocId(Random rand) {
+    return String.format("%04d", rand.nextInt(5000));
+  }
+
+  private static String randomDocument(Random rand) {
+    StringBuilder sb = new StringBuilder();
+
+    String sep = "";
+    for (int i = 2; i < rand.nextInt(18); i++) {
+      sb.append(sep);
+      sep = " ";
+      sb.append(String.format("%05d", rand.nextInt(50000)));
+    }
+
+    return sb.toString();
+  }
+
+  public void diff(Map<String, Long> m1, Map<String, Long> m2) {
+    for (String word : m1.keySet()) {
+      Long v1 = m1.get(word);
+      Long v2 = m2.get(word);
+
+      if (v2 == null || !v1.equals(v2)) {
+        System.out.println(word + " " + v1 + " != " + v2);
+      }
+    }
+
+    for (String word : m2.keySet()) {
+      Long v1 = m1.get(word);
+      Long v2 = m2.get(word);
+
+      if (v1 == null) {
+        System.out.println(word + " null != " + v2);
+      }
+    }
+  }
+
+  @Test
+  public void testStress() throws Exception {
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      Random rand = new Random();
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        for (int i = 0; i < 1000; i++) {
+          loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand)));
+        }
+      }
+
+      miniFluo.waitForObservers();
+      assertWordCountsEqual(fc);
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        for (int i = 0; i < 100; i++) {
+          loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand)));
+        }
+      }
+
+      miniFluo.waitForObservers();
+      assertWordCountsEqual(fc);
+    }
+  }
+
+  private void assertWordCountsEqual(FluoClient fc) {
+    Map<String, Long> expected = computeWordCounts(fc);
+    Map<String, Long> actual = getComputedWordCounts(fc);
+    if (!expected.equals(actual)) {
+      diff(expected, actual);
+      Assert.fail();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentLoader.java
new file mode 100644
index 0000000..c68b3b1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map.it;
+
+import org.apache.fluo.recipes.core.types.TypedLoader;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentLoader extends TypedLoader {
+
+  String docid;
+  String doc;
+
+  DocumentLoader(String docid, String doc) {
+    this.docid = docid;
+    this.doc = doc;
+  }
+
+  @Override
+  public void load(TypedTransactionBase tx, Context context) throws Exception {
+    tx.mutate().row("d:" + docid).fam("content").qual("new").set(doc);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java
new file mode 100644
index 0000000..c469151
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map.it;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap;
+import org.apache.fluo.recipes.core.types.TypedObserver;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentObserver extends TypedObserver {
+
+  CollisionFreeMap<String, Long> wcm;
+
+  @Override
+  public void init(Context context) throws Exception {
+    wcm = CollisionFreeMap.getInstance(CollisionFreeMapIT.MAP_ID, context.getAppConfiguration());
+  }
+
+  @Override
+  public ObservedColumn getObservedColumn() {
+    return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG);
+  }
+
+  static Map<String, Long> getWordCounts(String doc) {
+    Map<String, Long> wordCounts = new HashMap<>();
+    String[] words = doc.split(" ");
+    for (String word : words) {
+      if (word.isEmpty()) {
+        continue;
+      }
+      wordCounts.merge(word, 1L, Long::sum);
+    }
+
+    return wordCounts;
+  }
+
+  @Override
+  public void process(TypedTransactionBase tx, Bytes row, Column col) {
+    String newContent = tx.get().row(row).col(col).toString();
+    String currentContent = tx.get().row(row).fam("content").qual("current").toString("");
+
+    Map<String, Long> newWordCounts = getWordCounts(newContent);
+    Map<String, Long> currentWordCounts = getWordCounts(currentContent);
+
+    Map<String, Long> changes = calculateChanges(newWordCounts, currentWordCounts);
+
+    wcm.update(tx, changes);
+
+    tx.mutate().row(row).fam("content").qual("current").set(newContent);
+  }
+
+  private static Map<String, Long> calculateChanges(Map<String, Long> newCounts,
+      Map<String, Long> currCounts) {
+    Map<String, Long> changes = new HashMap<>();
+
+    // guava Maps class
+    MapDifference<String, Long> diffs = Maps.difference(currCounts, newCounts);
+
+    // compute the diffs for words that changed
+    changes.putAll(Maps.transformValues(diffs.entriesDiffering(), vDiff -> vDiff.rightValue()
+        - vDiff.leftValue()));
+
+    // add all new words
+    changes.putAll(diffs.entriesOnlyOnRight());
+
+    // subtract all words no longer present
+    changes.putAll(Maps.transformValues(diffs.entriesOnlyOnLeft(), l -> l * -1));
+
+    return changes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/TestSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/TestSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/TestSerializer.java
new file mode 100644
index 0000000..0bc5ff6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/TestSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map.it;
+
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+public class TestSerializer implements SimpleSerializer {
+
+  @Override
+  public <T> byte[] serialize(T obj) {
+    return obj.toString().getBytes();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T> T deserialize(byte[] serObj, Class<T> clazz) {
+    if (clazz.equals(Long.class)) {
+      return (T) Long.valueOf(new String(serObj));
+    }
+
+    if (clazz.equals(String.class)) {
+      return (T) new String(serObj);
+    }
+
+    throw new IllegalArgumentException();
+  }
+
+  @Override
+  public void init(SimpleConfiguration appConfig) {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountCombiner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountCombiner.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountCombiner.java
new file mode 100644
index 0000000..df1389f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountCombiner.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map.it;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.fluo.recipes.core.map.Combiner;
+
+public class WordCountCombiner implements Combiner<String, Long> {
+  @Override
+  public Optional<Long> combine(String key, Iterator<Long> updates) {
+    long sum = 0;
+
+    while (updates.hasNext()) {
+      sum += updates.next();
+    }
+
+    if (sum == 0) {
+      return Optional.empty();
+    } else {
+      return Optional.of(sum);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java
new file mode 100644
index 0000000..1230d99
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map.it;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.map.Update;
+import org.apache.fluo.recipes.core.map.UpdateObserver;
+
+public class WordCountObserver extends UpdateObserver<String, Long> {
+
+  @Override
+  public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
+
+    while (updates.hasNext()) {
+      Update<String, Long> update = updates.next();
+
+      Optional<Long> oldVal = update.getOldValue();
+      Optional<Long> newVal = update.getNewValue();
+
+      if (oldVal.isPresent()) {
+        String oldRow = String.format("iwc:%09d:%s", oldVal.get(), update.getKey());
+        tx.delete(Bytes.of(oldRow), new Column(Bytes.EMPTY, Bytes.EMPTY));
+      }
+
+      if (newVal.isPresent()) {
+        String newRow = String.format("iwc:%09d:%s", newVal.get(), update.getKey());
+        tx.set(Bytes.of(newRow), new Column(Bytes.EMPTY, Bytes.EMPTY), Bytes.EMPTY);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/kryo/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kryo/pom.xml b/modules/kryo/pom.xml
index fe00f96..eaed2ee 100644
--- a/modules/kryo/pom.xml
+++ b/modules/kryo/pom.xml
@@ -17,7 +17,7 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.fluo</groupId>
-    <artifactId>fluo-recipes-parent</artifactId>
+    <artifactId>fluo-recipes</artifactId>
     <version>1.0.0-incubating-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java
----------------------------------------------------------------------
diff --git a/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java b/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java
index aff9e7c..e8d7997 100644
--- a/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java
+++ b/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java
@@ -110,7 +110,7 @@ public class KryoSimplerSerializer implements SimpleSerializer, Serializable {
   public KryoSimplerSerializer() {}
 
   /**
-   * Can call this method to create a serializer w/o calling {@link #init(Configuration)}
+   * Can call this method to create a serializer w/o calling {@link #init(SimpleConfiguration)}
    */
   public KryoSimplerSerializer(KryoFactory factory) {
     factoryType = factory.getClass().getName();

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/kryo/src/test/java/org/apache/fluo/recipes/core/serialization/KryoSimpleSerializerTest.java
----------------------------------------------------------------------
diff --git a/modules/kryo/src/test/java/org/apache/fluo/recipes/core/serialization/KryoSimpleSerializerTest.java b/modules/kryo/src/test/java/org/apache/fluo/recipes/core/serialization/KryoSimpleSerializerTest.java
deleted file mode 100644
index 95a26a9..0000000
--- a/modules/kryo/src/test/java/org/apache/fluo/recipes/core/serialization/KryoSimpleSerializerTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.core.serialization;
-
-import com.esotericsoftware.kryo.pool.KryoFactory;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.kryo.KryoSimplerSerializer;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class KryoSimpleSerializerTest {
-
-  private static final KryoFactory KRYO_FACTORY = new KryoSimplerSerializer.DefaultFactory();
-
-  public void testColumn() {
-    SimpleSerializer serializer = new KryoSimplerSerializer(KRYO_FACTORY);
-    Column before = new Column("a", "b");
-    byte[] barray = serializer.serialize(before);
-    Column after = serializer.deserialize(barray, Column.class);
-    Assert.assertEquals(before, after);
-  }
-
-  @Test
-  public void testBytes() {
-    SimpleSerializer serializer = new KryoSimplerSerializer(KRYO_FACTORY);
-    Bytes before = Bytes.of("test");
-    byte[] barray = serializer.serialize(before);
-    Bytes after = serializer.deserialize(barray, Bytes.class);
-    Assert.assertEquals(before, after);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/kryo/src/test/java/org/apache/fluo/recipes/kryo/KryoSimpleSerializerTest.java
----------------------------------------------------------------------
diff --git a/modules/kryo/src/test/java/org/apache/fluo/recipes/kryo/KryoSimpleSerializerTest.java b/modules/kryo/src/test/java/org/apache/fluo/recipes/kryo/KryoSimpleSerializerTest.java
new file mode 100644
index 0000000..d2306d2
--- /dev/null
+++ b/modules/kryo/src/test/java/org/apache/fluo/recipes/kryo/KryoSimpleSerializerTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.kryo;
+
+import com.esotericsoftware.kryo.pool.KryoFactory;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+import org.apache.fluo.recipes.kryo.KryoSimplerSerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KryoSimpleSerializerTest {
+
+  private static final KryoFactory KRYO_FACTORY = new KryoSimplerSerializer.DefaultFactory();
+
+  public void testColumn() {
+    SimpleSerializer serializer = new KryoSimplerSerializer(KRYO_FACTORY);
+    Column before = new Column("a", "b");
+    byte[] barray = serializer.serialize(before);
+    Column after = serializer.deserialize(barray, Column.class);
+    Assert.assertEquals(before, after);
+  }
+
+  @Test
+  public void testBytes() {
+    SimpleSerializer serializer = new KryoSimplerSerializer(KRYO_FACTORY);
+    Bytes before = Bytes.of("test");
+    byte[] barray = serializer.serialize(before);
+    Bytes after = serializer.deserialize(barray, Bytes.class);
+    Assert.assertEquals(before, after);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index 1a0969e..5cb80a4 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -17,7 +17,7 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.fluo</groupId>
-    <artifactId>fluo-recipes-parent</artifactId>
+    <artifactId>fluo-recipes</artifactId>
     <version>1.0.0-incubating-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/FluoSparkHelperIT.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/FluoSparkHelperIT.java b/modules/spark/src/test/java/org/apache/fluo/recipes/spark/FluoSparkHelperIT.java
deleted file mode 100644
index 4ad25e2..0000000
--- a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/FluoSparkHelperIT.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.spark;
-
-import java.util.List;
-
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.test.AccumuloExportITBase;
-import org.apache.fluo.recipes.test.FluoITHelper;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class FluoSparkHelperIT extends AccumuloExportITBase {
-
-  static JavaSparkContext ctx;
-
-  public FluoSparkHelperIT() {
-    super(false);
-  }
-
-  @BeforeClass
-  public static void setupIT() {
-    ctx = FluoSparkTestUtil.newSparkContext("fluo-spark-helper");
-  }
-
-  @AfterClass
-  public static void teardownIT() {
-    ctx.stop();
-  }
-
-  private List<RowColumnValue> getData() {
-    return FluoITHelper.parse("arow|acf|acq|aval", "brow|bcf|bcq|bval", "crow|ccf|ccq|cval");
-  }
-
-  @Test
-  public void testAccumuloBulkImport() throws Exception {
-    FluoSparkHelper fsh =
-        new FluoSparkHelper(getFluoConfiguration(), ctx.hadoopConfiguration(), new Path("/tmp/"));
-    List<RowColumnValue> expected = getData();
-    final String accumuloTable = "table1";
-    getAccumuloConnector().tableOperations().create(accumuloTable);
-    fsh.bulkImportRcvToAccumulo(FluoSparkHelper.toPairRDD(ctx.parallelize(expected)),
-        accumuloTable, new FluoSparkHelper.BulkImportOptions());
-    Assert.assertTrue(FluoITHelper.verifyAccumuloTable(getAccumuloConnector(), accumuloTable,
-        expected));
-  }
-
-  @Test
-  public void testFluoBulkImport() throws Exception {
-    FluoSparkHelper fsh =
-        new FluoSparkHelper(getFluoConfiguration(), ctx.hadoopConfiguration(), new Path("/tmp/"));
-    List<RowColumnValue> expected = getData();
-    fsh.bulkImportRcvToFluo(FluoSparkHelper.toPairRDD(ctx.parallelize(expected)),
-        new FluoSparkHelper.BulkImportOptions());
-
-    try (MiniFluo miniFluo = FluoFactory.newMiniFluo(getFluoConfiguration())) {
-      Assert.assertTrue(FluoITHelper.verifyFluoTable(getFluoConfiguration(), expected));
-
-      List<RowColumnValue> actualRead = FluoSparkHelper.toRcvRDD(fsh.readFromFluo(ctx)).collect();
-      Assert.assertTrue(FluoITHelper.verifyRowColumnValues(expected, actualRead));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java b/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java
new file mode 100644
index 0000000..c2d512d
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.spark.it;
+
+import java.util.List;
+
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.recipes.spark.FluoSparkHelper;
+import org.apache.fluo.recipes.spark.FluoSparkTestUtil;
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.fluo.recipes.test.FluoITHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FluoSparkHelperIT extends AccumuloExportITBase {
+
+  static JavaSparkContext ctx;
+
+  public FluoSparkHelperIT() {
+    super(false);
+  }
+
+  @BeforeClass
+  public static void setupIT() {
+    ctx = FluoSparkTestUtil.newSparkContext("fluo-spark-helper");
+  }
+
+  @AfterClass
+  public static void teardownIT() {
+    ctx.stop();
+  }
+
+  private List<RowColumnValue> getData() {
+    return FluoITHelper.parse("arow|acf|acq|aval", "brow|bcf|bcq|bval", "crow|ccf|ccq|cval");
+  }
+
+  @Test
+  public void testAccumuloBulkImport() throws Exception {
+    FluoSparkHelper fsh =
+        new FluoSparkHelper(getFluoConfiguration(), ctx.hadoopConfiguration(), new Path("/tmp/"));
+    List<RowColumnValue> expected = getData();
+    final String accumuloTable = "table1";
+    getAccumuloConnector().tableOperations().create(accumuloTable);
+    fsh.bulkImportRcvToAccumulo(FluoSparkHelper.toPairRDD(ctx.parallelize(expected)),
+        accumuloTable, new FluoSparkHelper.BulkImportOptions());
+    Assert.assertTrue(FluoITHelper.verifyAccumuloTable(getAccumuloConnector(), accumuloTable,
+        expected));
+  }
+
+  @Test
+  public void testFluoBulkImport() throws Exception {
+    FluoSparkHelper fsh =
+        new FluoSparkHelper(getFluoConfiguration(), ctx.hadoopConfiguration(), new Path("/tmp/"));
+    List<RowColumnValue> expected = getData();
+    fsh.bulkImportRcvToFluo(FluoSparkHelper.toPairRDD(ctx.parallelize(expected)),
+        new FluoSparkHelper.BulkImportOptions());
+
+    try (MiniFluo miniFluo = FluoFactory.newMiniFluo(getFluoConfiguration())) {
+      Assert.assertTrue(FluoITHelper.verifyFluoTable(getFluoConfiguration(), expected));
+
+      List<RowColumnValue> actualRead = FluoSparkHelper.toRcvRDD(fsh.readFromFluo(ctx)).collect();
+      Assert.assertTrue(FluoITHelper.verifyRowColumnValues(expected, actualRead));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/test/pom.xml
----------------------------------------------------------------------
diff --git a/modules/test/pom.xml b/modules/test/pom.xml
index 8bfb466..86f5fbd 100644
--- a/modules/test/pom.xml
+++ b/modules/test/pom.xml
@@ -17,7 +17,7 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.fluo</groupId>
-    <artifactId>fluo-recipes-parent</artifactId>
+    <artifactId>fluo-recipes</artifactId>
     <version>1.0.0-incubating-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
----------------------------------------------------------------------
diff --git a/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java b/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
index ba7b1e6..acc13b7 100644
--- a/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
+++ b/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
@@ -31,7 +31,6 @@ import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.mini.MiniFluo;
 import org.apache.fluo.recipes.accumulo.ops.TableOperations;
-import org.apache.fluo.recipes.core.common.TableOptimizations;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5c926c7..5be0541 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,8 +20,7 @@
     <artifactId>fluo-parent</artifactId>
     <version>1-incubating</version>
   </parent>
-  <groupId>org.apache.fluo</groupId>
-  <artifactId>fluo-recipes-parent</artifactId>
+  <artifactId>fluo-recipes</artifactId>
   <version>1.0.0-incubating-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>Fluo Recipes Parent</name>
@@ -51,7 +50,7 @@
     <url>https://github.com/apache/incubator-fluo-recipes/issues</url>
   </issueManagement>
   <properties>
-    <accumulo.version>1.6.5</accumulo.version>
+    <accumulo.version>1.6.6</accumulo.version>
     <curator.version>2.7.1</curator.version>
     <findbugs.maxRank>13</findbugs.maxRank>
     <fluo.version>1.0.0-incubating</fluo.version>
@@ -225,16 +224,6 @@
     <pluginManagement>
       <plugins>
         <plugin>
-          <groupId>org.apache.rat</groupId>
-          <artifactId>apache-rat-plugin</artifactId>
-          <configuration>
-            <excludes>
-              <exclude>README.md</exclude>
-              <exclude>docs/**.md</exclude>
-            </excludes>
-          </configuration>
-        </plugin>
-        <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-failsafe-plugin</artifactId>
           <configuration>
@@ -244,29 +233,6 @@
             </systemPropertyVariables>
           </configuration>
         </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-jar-plugin</artifactId>
-          <configuration>
-            <archive>
-              <manifest>
-                <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
-                <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
-              </manifest>
-              <manifestEntries>
-                <!-- sealing breaks ITs with shaded jar, which is used by this example -->
-                <Sealed>false</Sealed>
-              </manifestEntries>
-            </archive>
-          </configuration>
-        </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-javadoc-plugin</artifactId>
-          <configuration>
-            <excludePackageNames>*.impl.*</excludePackageNames>
-          </configuration>
-        </plugin>
       </plugins>
     </pluginManagement>
     <plugins>


Mime
View raw message