Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0AFCE200BA1 for ; Mon, 17 Oct 2016 16:56:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 09652160AEC; Mon, 17 Oct 2016 14:56:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A1B91160AE5 for ; Mon, 17 Oct 2016 16:56:37 +0200 (CEST) Received: (qmail 65989 invoked by uid 500); 17 Oct 2016 14:56:36 -0000 Mailing-List: contact commits-help@fluo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@fluo.incubator.apache.org Delivered-To: mailing list commits@fluo.incubator.apache.org Received: (qmail 65980 invoked by uid 99); 17 Oct 2016 14:56:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Oct 2016 14:56:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id F3B48C0A7F for ; Mon, 17 Oct 2016 14:56:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.218 X-Spam-Level: X-Spam-Status: No, score=-6.218 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, URIBL_RED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id crqEJLwISffb for ; Mon, 17 Oct 2016 14:56:31 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 2E1195F299 for ; Mon, 17 Oct 2016 14:56:29 +0000 (UTC) Received: (qmail 65941 invoked by uid 99); 17 Oct 2016 14:56:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Oct 2016 14:56:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0C50ADF9AD; Mon, 17 Oct 2016 14:56:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@fluo.incubator.apache.org Date: Mon, 17 Oct 2016 14:56:28 -0000 Message-Id: <34ccb40f78124e9bb94c461278efc3be@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-fluo-recipes git commit: Misc release prep cleanup archived-at: Mon, 17 Oct 2016 14:56:40 -0000 Misc release prep cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/10ccb16a Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/10ccb16a Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/10ccb16a Branch: refs/heads/master Commit: 10ccb16aedd8c141f5bd21be6822481471afa7ad Parents: 573aeb7 Author: Keith Turner Authored: Fri Oct 14 17:04:59 2016 -0400 Committer: Keith Turner Committed: Fri Oct 14 17:43:06 2016 -0400 ---------------------------------------------------------------------- README.md | 16 + docs/accumulo-export-queue.md | 16 + docs/cfm.md | 16 + docs/export-queue.md | 16 + docs/recording-tx.md | 16 + docs/row-hasher.md | 16 + docs/serialization.md | 16 + docs/table-optimization.md | 16 + docs/testing.md | 16 + docs/transient.md | 16 + modules/accumulo/pom.xml | 2 +- modules/core/pom.xml | 2 +- .../recipes/core/common/TableOptimizations.java | 2 +- .../fluo/recipes/core/export/ExportBucket.java | 14 +- .../fluo/recipes/core/impl/BucketUtil.java | 31 -- .../fluo/recipes/core/map/CollisionFreeMap.java | 22 +- .../recipes/core/export/DocumentLoader.java | 35 -- .../recipes/core/export/DocumentObserver.java | 63 ---- .../recipes/core/export/ExportBufferIT.java | 106 ------ .../fluo/recipes/core/export/ExportQueueIT.java | 114 ------ .../recipes/core/export/ExportTestBase.java | 271 -------------- .../recipes/core/export/GsonSerializer.java | 42 --- .../fluo/recipes/core/export/OptionsTest.java | 1 + .../fluo/recipes/core/export/RefInfo.java | 26 -- .../fluo/recipes/core/export/RefUpdates.java | 43 --- .../recipes/core/export/it/DocumentLoader.java | 35 ++ .../core/export/it/DocumentObserver.java | 64 ++++ .../recipes/core/export/it/ExportBufferIT.java | 107 ++++++ .../recipes/core/export/it/ExportQueueIT.java | 114 ++++++ .../recipes/core/export/it/ExportTestBase.java | 274 ++++++++++++++ .../recipes/core/export/it/GsonSerializer.java | 42 +++ .../fluo/recipes/core/export/it/RefInfo.java | 26 ++ .../fluo/recipes/core/export/it/RefUpdates.java | 43 +++ .../fluo/recipes/core/map/BigUpdateIT.java | 208 ----------- .../recipes/core/map/CollisionFreeMapIT.java | 352 ------------------ .../fluo/recipes/core/map/DocumentLoader.java | 35 -- .../fluo/recipes/core/map/DocumentObserver.java | 89 ----- .../fluo/recipes/core/map/SplitsTest.java | 1 + .../fluo/recipes/core/map/TestSerializer.java | 45 --- .../recipes/core/map/WordCountCombiner.java | 36 -- .../recipes/core/map/WordCountObserver.java | 47 --- .../fluo/recipes/core/map/it/BigUpdateIT.java | 212 +++++++++++ .../recipes/core/map/it/CollisionFreeMapIT.java | 353 +++++++++++++++++++ .../recipes/core/map/it/DocumentLoader.java | 35 ++ .../recipes/core/map/it/DocumentObserver.java | 90 +++++ .../recipes/core/map/it/TestSerializer.java | 45 +++ .../recipes/core/map/it/WordCountCombiner.java | 38 ++ .../recipes/core/map/it/WordCountObserver.java | 49 +++ modules/kryo/pom.xml | 2 +- .../recipes/kryo/KryoSimplerSerializer.java | 2 +- .../serialization/KryoSimpleSerializerTest.java | 45 --- .../recipes/kryo/KryoSimpleSerializerTest.java | 46 +++ modules/spark/pom.xml | 2 +- .../fluo/recipes/spark/FluoSparkHelperIT.java | 82 ----- .../recipes/spark/it/FluoSparkHelperIT.java | 84 +++++ modules/test/pom.xml | 2 +- .../fluo/recipes/test/AccumuloExportITBase.java | 1 - pom.xml | 38 +- 58 files changed, 1856 insertions(+), 1722 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index b592204..e5d3a5a 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,19 @@ + # Apache Fluo Recipes [![Build Status][ti]][tl] [![Apache License][li]][ll] http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/accumulo-export-queue.md ---------------------------------------------------------------------- diff --git a/docs/accumulo-export-queue.md b/docs/accumulo-export-queue.md index b880d58..8b4bf39 100644 --- a/docs/accumulo-export-queue.md +++ b/docs/accumulo-export-queue.md @@ -1,3 +1,19 @@ + # Accumulo Export Queue Specialization ## Background http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/cfm.md ---------------------------------------------------------------------- diff --git a/docs/cfm.md b/docs/cfm.md index 7b3a2ac..85b05b0 100644 --- a/docs/cfm.md +++ b/docs/cfm.md @@ -1,3 +1,19 @@ + # Collision Free Map Recipe ## Background http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/export-queue.md ---------------------------------------------------------------------- diff --git a/docs/export-queue.md b/docs/export-queue.md index c366e9d..7ad931f 100644 --- a/docs/export-queue.md +++ b/docs/export-queue.md @@ -1,3 +1,19 @@ + # Export Queue Recipe ## Background http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/recording-tx.md ---------------------------------------------------------------------- diff --git a/docs/recording-tx.md b/docs/recording-tx.md index 0dca693..7a6fb8e 100644 --- a/docs/recording-tx.md +++ b/docs/recording-tx.md @@ -1,3 +1,19 @@ + # RecordingTransaction recipe A `RecordingTransaction` is an implementation of `Transaction` that logs all transaction operations http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/row-hasher.md ---------------------------------------------------------------------- diff --git a/docs/row-hasher.md b/docs/row-hasher.md index 1f5158d..d6d603b 100644 --- a/docs/row-hasher.md +++ b/docs/row-hasher.md @@ -1,3 +1,19 @@ + # Row hash prefix recipe ## Background http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/serialization.md ---------------------------------------------------------------------- diff --git a/docs/serialization.md b/docs/serialization.md index 9b664a0..25b9d83 100644 --- a/docs/serialization.md +++ b/docs/serialization.md @@ -1,3 +1,19 @@ + # Serializing Data Various Fluo Recipes deal with POJOs and need to serialize them. The http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/table-optimization.md ---------------------------------------------------------------------- diff --git a/docs/table-optimization.md b/docs/table-optimization.md index 7235981..f170b77 100644 --- a/docs/table-optimization.md +++ b/docs/table-optimization.md @@ -1,3 +1,19 @@ + # Fluo Table Optimization ## Background http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/testing.md ---------------------------------------------------------------------- diff --git a/docs/testing.md b/docs/testing.md index 520fd1d..0908857 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -1,3 +1,19 @@ + # Testing Fluo includes MiniFluo which makes it possible to write an integeration test that http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/docs/transient.md ---------------------------------------------------------------------- diff --git a/docs/transient.md b/docs/transient.md index d0ac845..a73a197 100644 --- a/docs/transient.md +++ b/docs/transient.md @@ -1,3 +1,19 @@ + # Transient data ## Background http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/accumulo/pom.xml ---------------------------------------------------------------------- diff --git a/modules/accumulo/pom.xml b/modules/accumulo/pom.xml index 945b9fc..751fbee 100644 --- a/modules/accumulo/pom.xml +++ b/modules/accumulo/pom.xml @@ -17,7 +17,7 @@ 4.0.0 org.apache.fluo - fluo-recipes-parent + fluo-recipes 1.0.0-incubating-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/core/pom.xml b/modules/core/pom.xml index 4d6893d..720d5f0 100644 --- a/modules/core/pom.xml +++ b/modules/core/pom.xml @@ -17,7 +17,7 @@ 4.0.0 org.apache.fluo - fluo-recipes-parent + fluo-recipes 1.0.0-incubating-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java index 3f0e5f2..f2833c6 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java @@ -77,7 +77,7 @@ public class TableOptimizations { * initialization. After Fluo is initialized, the optimizations can be retrieved by calling * {@link #getConfiguredOptimizations(FluoConfiguration)}. * - * @param application config, likely obtained from calling + * @param appConfig config, likely obtained from calling * {@link FluoConfiguration#getAppConfiguration()} * @param key A unique identifier for the optimization * @param clazz The optimization factory type. http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java index 6dadb8c..8100b37 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java @@ -18,6 +18,7 @@ package org.apache.fluo.recipes.core.export; import java.util.Iterator; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.client.scanner.CellScanner; import org.apache.fluo.api.data.Bytes; @@ -25,7 +26,6 @@ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.RowColumnValue; import org.apache.fluo.api.data.Span; -import org.apache.fluo.recipes.core.impl.BucketUtil; import org.apache.fluo.recipes.core.types.StringEncoder; import org.apache.fluo.recipes.core.types.TypeLayer; import org.apache.fluo.recipes.core.types.TypedTransactionBase; @@ -47,8 +47,18 @@ class ExportBucket { private final String qid; private final Bytes bucketRow; + static String genBucketId(int bucket, int maxBucket) { + Preconditions.checkArgument(bucket >= 0); + Preconditions.checkArgument(maxBucket > 0); + + int bits = 32 - Integer.numberOfLeadingZeros(maxBucket); + int bucketLen = bits / 4 + (bits % 4 > 0 ? 1 : 0); + + return Strings.padStart(Integer.toHexString(bucket), bucketLen, '0'); + } + static Bytes generateBucketRow(String qid, int bucket, int numBuckets) { - return Bytes.of(qid + ":" + BucketUtil.genBucketId(bucket, numBuckets)); + return Bytes.of(qid + ":" + genBucketId(bucket, numBuckets)); } ExportBucket(TransactionBase tx, String qid, int bucket, int numBuckets) { http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java deleted file mode 100644 index be28878..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.impl; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - -public class BucketUtil { - public static String genBucketId(int bucket, int maxBucket) { - Preconditions.checkArgument(bucket >= 0); - Preconditions.checkArgument(maxBucket > 0); - - int bits = 32 - Integer.numberOfLeadingZeros(maxBucket); - int bucketLen = bits / 4 + (bits % 4 > 0 ? 1 : 0); - - return Strings.padStart(Integer.toHexString(bucket), bucketLen, '0'); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java index d551096..715d330 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.regex.Pattern; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; @@ -48,7 +49,6 @@ import org.apache.fluo.recipes.core.common.TableOptimizations; import org.apache.fluo.recipes.core.common.TableOptimizations.TableOptimizationsFactory; import org.apache.fluo.recipes.core.common.RowRange; import org.apache.fluo.recipes.core.common.TransientRegistry; -import org.apache.fluo.recipes.core.impl.BucketUtil; import org.apache.fluo.recipes.core.serialization.SimpleSerializer; /** @@ -290,7 +290,7 @@ public class CollisionFreeMap { byte[] k = serializer.serialize(key); int hash = Hashing.murmur3_32().hashBytes(k).asInt(); - String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets); + String bucketId = genBucketId(Math.abs(hash % numBuckets), numBuckets); BytesBuilder rowBuilder = Bytes.builder(); @@ -352,7 +352,7 @@ public class CollisionFreeMap { for (Entry entry : updates.entrySet()) { byte[] k = serializer.serialize(entry.getKey()); int hash = Hashing.murmur3_32().hashBytes(k).asInt(); - String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets); + String bucketId = genBucketId(Math.abs(hash % numBuckets), numBuckets); // reset to the common row prefix rowBuilder.setLength(prefixLength); @@ -377,6 +377,16 @@ public class CollisionFreeMap { } } + static String genBucketId(int bucket, int maxBucket) { + Preconditions.checkArgument(bucket >= 0); + Preconditions.checkArgument(maxBucket > 0); + + int bits = 32 - Integer.numberOfLeadingZeros(maxBucket); + int bucketLen = bits / 4 + (bits % 4 > 0 ? 1 : 0); + + return Strings.padStart(Integer.toHexString(bucket), bucketLen, '0'); + } + public static CollisionFreeMap getInstance(String mapId, SimpleConfiguration appConf) { Options opts = new Options(mapId, appConf); @@ -424,7 +434,7 @@ public class CollisionFreeMap { public RowColumnValue convert(K2 key, V2 val) { byte[] k = serializer.serialize(key); int hash = Hashing.murmur3_32().hashBytes(k).asInt(); - String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets); + String bucketId = genBucketId(Math.abs(hash % numBuckets), numBuckets); BytesBuilder bb = Bytes.builder(); Bytes row = bb.append(mapId).append(":d:").append(bucketId).append(":").append(k).toBytes(); @@ -605,7 +615,7 @@ public class CollisionFreeMap { List dataSplits = new ArrayList<>(); for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) { - String bucketId = BucketUtil.genBucketId(i, opts.numBuckets); + String bucketId = genBucketId(i, opts.numBuckets); rowBuilder.setLength(mapId.length()); dataSplits.add(rowBuilder.append(":d:").append(bucketId).toBytes()); } @@ -613,7 +623,7 @@ public class CollisionFreeMap { List updateSplits = new ArrayList<>(); for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) { - String bucketId = BucketUtil.genBucketId(i, opts.numBuckets); + String bucketId = genBucketId(i, opts.numBuckets); rowBuilder.setLength(mapId.length()); updateSplits.add(rowBuilder.append(":u:").append(bucketId).toBytes()); } http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java deleted file mode 100644 index f7af791..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.export; - -import org.apache.fluo.recipes.core.types.TypedLoader; -import org.apache.fluo.recipes.core.types.TypedTransactionBase; - -public class DocumentLoader extends TypedLoader { - - String docid; - String refs[]; - - DocumentLoader(String docid, String... refs) { - this.docid = docid; - this.refs = refs; - } - - @Override - public void load(TypedTransactionBase tx, Context context) throws Exception { - tx.mutate().row("d:" + docid).fam("content").qual("new").set(String.join(" ", refs)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java deleted file mode 100644 index c4c11d8..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.export; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.recipes.core.export.ExportTestBase.RefExporter; -import org.apache.fluo.recipes.core.types.TypedObserver; -import org.apache.fluo.recipes.core.types.TypedTransactionBase; - -public class DocumentObserver extends TypedObserver { - - ExportQueue refExportQueue; - - @Override - public void init(Context context) throws Exception { - refExportQueue = ExportQueue.getInstance(RefExporter.QUEUE_ID, context.getAppConfiguration()); - } - - @Override - public ObservedColumn getObservedColumn() { - return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG); - } - - @Override - public void process(TypedTransactionBase tx, Bytes row, Column col) { - String newContent = tx.get().row(row).col(col).toString(); - Set newRefs = new HashSet<>(Arrays.asList(newContent.split(" "))); - Set currentRefs = - new HashSet<>(Arrays.asList(tx.get().row(row).fam("content").qual("current").toString("") - .split(" "))); - - Set addedRefs = new HashSet<>(newRefs); - addedRefs.removeAll(currentRefs); - - Set deletedRefs = new HashSet<>(currentRefs); - deletedRefs.removeAll(newRefs); - - String key = row.toString().substring(2); - RefUpdates val = new RefUpdates(addedRefs, deletedRefs); - - refExportQueue.add(tx, key, val); - - tx.mutate().row(row).fam("content").qual("current").set(newContent); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java deleted file mode 100644 index d54982b..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.export; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.client.Transaction; -import org.junit.Assert; -import org.junit.Test; - -public class ExportBufferIT extends ExportTestBase { - - @Override - protected int getNumBuckets() { - return 2; - } - - @Override - protected Integer getBufferSize() { - return 1024; - } - - @Test - public void testSmallExportBuffer() { - // try setting the export buffer size small. Make sure everything is exported. - - try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { - ExportQueue refExportQueue = - ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration()); - try (Transaction tx = fc.newTransaction()) { - for (int i = 0; i < 1000; i++) { - refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 10, i + 20), ns(new int[0]))); - } - - tx.commit(); - } - } - - miniFluo.waitForObservers(); - - Map> erefs = getExportedReferees(); - Map> expected = new HashMap<>(); - - for (int i = 0; i < 1000; i++) { - expected.computeIfAbsent(nk(i + 10), s -> new HashSet<>()).add(nk(i)); - expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i)); - } - - assertEquals(expected, erefs); - int prevNumExportCalls = getNumExportCalls(); - Assert.assertTrue(prevNumExportCalls > 10); // with small buffer there should be lots of exports - // calls - - try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { - ExportQueue refExportQueue = - ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration()); - try (Transaction tx = fc.newTransaction()) { - for (int i = 0; i < 1000; i++) { - refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 12), ns(i + 10))); - } - - tx.commit(); - } - } - - miniFluo.waitForObservers(); - - erefs = getExportedReferees(); - expected = new HashMap<>(); - - for (int i = 0; i < 1000; i++) { - expected.computeIfAbsent(nk(i + 12), s -> new HashSet<>()).add(nk(i)); - expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i)); - } - - assertEquals(expected, erefs); - prevNumExportCalls = getNumExportCalls() - prevNumExportCalls; - Assert.assertTrue(prevNumExportCalls > 10); - } - - public void assertEquals(Map> expected, Map> actual) { - if (!expected.equals(actual)) { - System.out.println("*** diff ***"); - diff(expected, actual); - Assert.fail(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java deleted file mode 100644 index b4e167c..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.export; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.client.LoaderExecutor; -import org.apache.fluo.api.config.FluoConfiguration; -import org.junit.Assert; -import org.junit.Test; - -public class ExportQueueIT extends ExportTestBase { - - @Test - public void testExport() { - try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - loader.execute(new DocumentLoader("0999", "0005", "0002")); - loader.execute(new DocumentLoader("0002", "0999", "0042")); - loader.execute(new DocumentLoader("0005", "0999", "0042")); - loader.execute(new DocumentLoader("0042", "0999")); - } - - miniFluo.waitForObservers(); - - Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999")); - Assert.assertEquals(ns("0999"), getExportedReferees("0002")); - Assert.assertEquals(ns("0999"), getExportedReferees("0005")); - Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042")); - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - loader.execute(new DocumentLoader("0999", "0005", "0042")); - } - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - loader.execute(new DocumentLoader("0999", "0005")); - } - - miniFluo.waitForObservers(); - - Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999")); - Assert.assertEquals(ns(new String[0]), getExportedReferees("0002")); - Assert.assertEquals(ns("0999"), getExportedReferees("0005")); - Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042")); - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - loader.execute(new DocumentLoader("0042", "0999", "0002", "0005")); - loader.execute(new DocumentLoader("0005", "0002")); - } - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - loader.execute(new DocumentLoader("0005", "0003")); - } - - miniFluo.waitForObservers(); - - Assert.assertEquals(ns("0002", "0042"), getExportedReferees("0999")); - Assert.assertEquals(ns("0042"), getExportedReferees("0002")); - Assert.assertEquals(ns("0005"), getExportedReferees("0003")); - Assert.assertEquals(ns("0999", "0042"), getExportedReferees("0005")); - Assert.assertEquals(ns("0002"), getExportedReferees("0042")); - - } - } - - @Test - public void exportStressTest() { - FluoConfiguration config = new FluoConfiguration(miniFluo.getClientConfiguration()); - config.setLoaderQueueSize(100); - config.setLoaderThreads(20); - - try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { - - loadRandom(fc, 1000, 500); - - miniFluo.waitForObservers(); - - diff(getFluoReferees(fc), getExportedReferees()); - - assertEquals(getFluoReferees(fc), getExportedReferees(), fc); - - loadRandom(fc, 1000, 500); - - miniFluo.waitForObservers(); - - assertEquals(getFluoReferees(fc), getExportedReferees(), fc); - - loadRandom(fc, 1000, 10000); - - miniFluo.waitForObservers(); - - assertEquals(getFluoReferees(fc), getExportedReferees(), fc); - - loadRandom(fc, 1000, 10000); - - miniFluo.waitForObservers(); - - assertEquals(getFluoReferees(fc), getExportedReferees(), fc); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java deleted file mode 100644 index fd3ece1..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.export; - -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Random; -import java.util.Set; - -import com.google.common.collect.Iterators; -import org.apache.commons.io.FileUtils; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.client.LoaderExecutor; -import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.client.scanner.CellScanner; -import org.apache.fluo.api.client.scanner.ColumnScanner; -import org.apache.fluo.api.client.scanner.RowScanner; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ObserverSpecification; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.ColumnValue; -import org.apache.fluo.api.data.Span; -import org.apache.fluo.api.mini.MiniFluo; -import org.apache.fluo.recipes.core.serialization.SimpleSerializer; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; - -public class ExportTestBase { - - private static Map> globalExports = new HashMap<>(); - private static int exportCalls = 0; - - protected static Set getExportedReferees(String node) { - synchronized (globalExports) { - Set ret = new HashSet<>(); - - Map referees = globalExports.get(node); - - if (referees == null) { - return ret; - } - - referees.forEach((k, v) -> { - if (!v.deleted) - ret.add(k); - }); - - return ret; - } - } - - protected static Map> getExportedReferees() { - synchronized (globalExports) { - - Map> ret = new HashMap<>(); - - for (String k : globalExports.keySet()) { - Set referees = getExportedReferees(k); - if (referees.size() > 0) { - ret.put(k, referees); - } - } - - return ret; - } - } - - protected static int getNumExportCalls() { - synchronized (globalExports) { - return exportCalls; - } - } - - public static class RefExporter extends Exporter { - - public static final String QUEUE_ID = "req"; - - private void updateExports(String key, long seq, String addedRef, boolean deleted) { - Map referees = globalExports.computeIfAbsent(addedRef, k -> new HashMap<>()); - referees.compute(key, (k, v) -> (v == null || v.seq < seq) ? new RefInfo(seq, deleted) : v); - } - - @Override - protected void processExports(Iterator> exportIterator) { - ArrayList> exportList = new ArrayList<>(); - Iterators.addAll(exportList, exportIterator); - - synchronized (globalExports) { - exportCalls++; - - for (SequencedExport se : exportList) { - for (String addedRef : se.getValue().getAddedRefs()) { - updateExports(se.getKey(), se.getSequence(), addedRef, false); - } - - for (String deletedRef : se.getValue().getDeletedRefs()) { - updateExports(se.getKey(), se.getSequence(), deletedRef, true); - } - } - } - } - } - - protected MiniFluo miniFluo; - - protected int getNumBuckets() { - return 13; - } - - protected Integer getBufferSize() { - return null; - } - - @Before - public void setUpFluo() throws Exception { - FileUtils.deleteQuietly(new File("target/mini")); - - FluoConfiguration props = new FluoConfiguration(); - props.setApplicationName("eqt"); - props.setWorkerThreads(20); - props.setMiniDataDir("target/mini"); - - ObserverSpecification doc = new ObserverSpecification(DocumentObserver.class.getName()); - props.addObserver(doc); - - SimpleSerializer.setSerializer(props, GsonSerializer.class); - - ExportQueue.Options exportQueueOpts = - new ExportQueue.Options(RefExporter.QUEUE_ID, RefExporter.class, String.class, - RefUpdates.class, getNumBuckets()); - - if (getBufferSize() != null) { - exportQueueOpts.setBufferSize(getBufferSize()); - } - - ExportQueue.configure(props, exportQueueOpts); - - miniFluo = FluoFactory.newMiniFluo(props); - - globalExports.clear(); - exportCalls = 0; - } - - @After - public void tearDownFluo() throws Exception { - if (miniFluo != null) { - miniFluo.close(); - } - } - - protected static Set ns(String... sa) { - return new HashSet<>(Arrays.asList(sa)); - } - - protected static String nk(int i) { - return String.format("%06d", i); - } - - protected static Set ns(int... ia) { - HashSet ret = new HashSet<>(); - for (int i : ia) { - ret.add(nk(i)); - } - return ret; - } - - public void assertEquals(Map> expected, Map> actual, - FluoClient fc) { - if (!expected.equals(actual)) { - System.out.println("*** diff ***"); - diff(expected, actual); - System.out.println("*** fluo dump ***"); - dump(fc); - System.out.println("*** map dump ***"); - - Assert.fail(); - } - } - - protected void loadRandom(FluoClient fc, int num, int maxDocId) { - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - Random rand = new Random(); - - for (int i = 0; i < num; i++) { - String docid = String.format("%05d", rand.nextInt(maxDocId)); - String[] refs = new String[rand.nextInt(20) + 1]; - for (int j = 0; j < refs.length; j++) { - refs[j] = String.format("%05d", rand.nextInt(maxDocId)); - } - - loader.execute(new DocumentLoader(docid, refs)); - } - } - } - - protected void diff(Map> fr, Map> er) { - HashSet allKeys = new HashSet<>(fr.keySet()); - allKeys.addAll(er.keySet()); - - for (String k : allKeys) { - Set s1 = fr.getOrDefault(k, Collections.emptySet()); - Set s2 = er.getOrDefault(k, Collections.emptySet()); - - HashSet sub1 = new HashSet<>(s1); - sub1.removeAll(s2); - - HashSet sub2 = new HashSet<>(s2); - sub2.removeAll(s1); - - if (sub1.size() > 0 || sub2.size() > 0) { - System.out.println(k + " " + sub1 + " " + sub2); - } - - } - } - - protected Map> getFluoReferees(FluoClient fc) { - Map> fluoReferees = new HashMap<>(); - - try (Snapshot snap = fc.newSnapshot()) { - - Column currCol = new Column("content", "current"); - RowScanner rowScanner = snap.scanner().over(Span.prefix("d:")).fetch(currCol).byRow().build(); - - for (ColumnScanner columnScanner : rowScanner) { - String docid = columnScanner.getsRow().substring(2); - - for (ColumnValue columnValue : columnScanner) { - String[] refs = columnValue.getsValue().split(" "); - - for (String ref : refs) { - if (ref.isEmpty()) - continue; - - fluoReferees.computeIfAbsent(ref, k -> new HashSet<>()).add(docid); - } - } - } - } - return fluoReferees; - } - - public static void dump(FluoClient fc) { - try (Snapshot snap = fc.newSnapshot()) { - CellScanner scanner = snap.scanner().build(); - scanner.forEach(rcv -> System.out.println("row:[" + rcv.getRow() + "] col:[" - + rcv.getColumn() + "] val:[" + rcv.getValue() + "]")); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java deleted file mode 100644 index 2d45ff3..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.export; - -import java.nio.charset.StandardCharsets; - -import com.google.gson.Gson; -import org.apache.fluo.api.config.SimpleConfiguration; -import org.apache.fluo.recipes.core.serialization.SimpleSerializer; - -public class GsonSerializer implements SimpleSerializer { - - private Gson gson = new Gson(); - - @Override - public void init(SimpleConfiguration appConfig) { - - } - - @Override - public byte[] serialize(T obj) { - return gson.toJson(obj).getBytes(StandardCharsets.UTF_8); - } - - @Override - public T deserialize(byte[] serObj, Class clazz) { - return gson.fromJson(new String(serObj, StandardCharsets.UTF_8), clazz); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java index 60982bd..376223b 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java @@ -17,6 +17,7 @@ package org.apache.fluo.recipes.core.export; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.config.SimpleConfiguration; +import org.apache.fluo.recipes.core.export.ExportQueue; import org.apache.fluo.recipes.core.export.ExportQueue.Options; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java deleted file mode 100644 index f4d1c76..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.export; - -class RefInfo { - long seq; - boolean deleted; - - public RefInfo(long seq, boolean deleted) { - this.seq = seq; - this.deleted = deleted; - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java deleted file mode 100644 index efa94dd..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.export; - -import java.util.Set; - -public class RefUpdates { - private Set addedRefs; - private Set deletedRefs; - - public RefUpdates() {} - - public RefUpdates(Set addedRefs, Set deletedRefs) { - this.addedRefs = addedRefs; - this.deletedRefs = deletedRefs; - } - - public Set getAddedRefs() { - return addedRefs; - } - - public Set getDeletedRefs() { - return deletedRefs; - } - - @Override - public String toString() { - return "added:" + addedRefs + " deleted:" + deletedRefs; - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentLoader.java new file mode 100644 index 0000000..6b8ca36 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentLoader.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export.it; + +import org.apache.fluo.recipes.core.types.TypedLoader; +import org.apache.fluo.recipes.core.types.TypedTransactionBase; + +public class DocumentLoader extends TypedLoader { + + String docid; + String refs[]; + + DocumentLoader(String docid, String... refs) { + this.docid = docid; + this.refs = refs; + } + + @Override + public void load(TypedTransactionBase tx, Context context) throws Exception { + tx.mutate().row("d:" + docid).fam("content").qual("new").set(String.join(" ", refs)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentObserver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentObserver.java new file mode 100644 index 0000000..c9765a7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/DocumentObserver.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export.it; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.recipes.core.export.ExportQueue; +import org.apache.fluo.recipes.core.export.it.ExportTestBase.RefExporter; +import org.apache.fluo.recipes.core.types.TypedObserver; +import org.apache.fluo.recipes.core.types.TypedTransactionBase; + +public class DocumentObserver extends TypedObserver { + + ExportQueue refExportQueue; + + @Override + public void init(Context context) throws Exception { + refExportQueue = ExportQueue.getInstance(RefExporter.QUEUE_ID, context.getAppConfiguration()); + } + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG); + } + + @Override + public void process(TypedTransactionBase tx, Bytes row, Column col) { + String newContent = tx.get().row(row).col(col).toString(); + Set newRefs = new HashSet<>(Arrays.asList(newContent.split(" "))); + Set currentRefs = + new HashSet<>(Arrays.asList(tx.get().row(row).fam("content").qual("current").toString("") + .split(" "))); + + Set addedRefs = new HashSet<>(newRefs); + addedRefs.removeAll(currentRefs); + + Set deletedRefs = new HashSet<>(currentRefs); + deletedRefs.removeAll(newRefs); + + String key = row.toString().substring(2); + RefUpdates val = new RefUpdates(addedRefs, deletedRefs); + + refExportQueue.add(tx, key, val); + + tx.mutate().row(row).fam("content").qual("current").set(newContent); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportBufferIT.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportBufferIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportBufferIT.java new file mode 100644 index 0000000..d937ac8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportBufferIT.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export.it; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.recipes.core.export.ExportQueue; +import org.junit.Assert; +import org.junit.Test; + +public class ExportBufferIT extends ExportTestBase { + + @Override + protected int getNumBuckets() { + return 2; + } + + @Override + protected Integer getBufferSize() { + return 1024; + } + + @Test + public void testSmallExportBuffer() { + // try setting the export buffer size small. Make sure everything is exported. + + try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { + ExportQueue refExportQueue = + ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration()); + try (Transaction tx = fc.newTransaction()) { + for (int i = 0; i < 1000; i++) { + refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 10, i + 20), ns(new int[0]))); + } + + tx.commit(); + } + } + + miniFluo.waitForObservers(); + + Map> erefs = getExportedReferees(); + Map> expected = new HashMap<>(); + + for (int i = 0; i < 1000; i++) { + expected.computeIfAbsent(nk(i + 10), s -> new HashSet<>()).add(nk(i)); + expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i)); + } + + assertEquals(expected, erefs); + int prevNumExportCalls = getNumExportCalls(); + Assert.assertTrue(prevNumExportCalls > 10); // with small buffer there should be lots of exports + // calls + + try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { + ExportQueue refExportQueue = + ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration()); + try (Transaction tx = fc.newTransaction()) { + for (int i = 0; i < 1000; i++) { + refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 12), ns(i + 10))); + } + + tx.commit(); + } + } + + miniFluo.waitForObservers(); + + erefs = getExportedReferees(); + expected = new HashMap<>(); + + for (int i = 0; i < 1000; i++) { + expected.computeIfAbsent(nk(i + 12), s -> new HashSet<>()).add(nk(i)); + expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i)); + } + + assertEquals(expected, erefs); + prevNumExportCalls = getNumExportCalls() - prevNumExportCalls; + Assert.assertTrue(prevNumExportCalls > 10); + } + + public void assertEquals(Map> expected, Map> actual) { + if (!expected.equals(actual)) { + System.out.println("*** diff ***"); + diff(expected, actual); + Assert.fail(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportQueueIT.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportQueueIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportQueueIT.java new file mode 100644 index 0000000..55c1a27 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportQueueIT.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export.it; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.LoaderExecutor; +import org.apache.fluo.api.config.FluoConfiguration; +import org.junit.Assert; +import org.junit.Test; + +public class ExportQueueIT extends ExportTestBase { + + @Test + public void testExport() { + try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + loader.execute(new DocumentLoader("0999", "0005", "0002")); + loader.execute(new DocumentLoader("0002", "0999", "0042")); + loader.execute(new DocumentLoader("0005", "0999", "0042")); + loader.execute(new DocumentLoader("0042", "0999")); + } + + miniFluo.waitForObservers(); + + Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999")); + Assert.assertEquals(ns("0999"), getExportedReferees("0002")); + Assert.assertEquals(ns("0999"), getExportedReferees("0005")); + Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042")); + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + loader.execute(new DocumentLoader("0999", "0005", "0042")); + } + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + loader.execute(new DocumentLoader("0999", "0005")); + } + + miniFluo.waitForObservers(); + + Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999")); + Assert.assertEquals(ns(new String[0]), getExportedReferees("0002")); + Assert.assertEquals(ns("0999"), getExportedReferees("0005")); + Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042")); + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + loader.execute(new DocumentLoader("0042", "0999", "0002", "0005")); + loader.execute(new DocumentLoader("0005", "0002")); + } + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + loader.execute(new DocumentLoader("0005", "0003")); + } + + miniFluo.waitForObservers(); + + Assert.assertEquals(ns("0002", "0042"), getExportedReferees("0999")); + Assert.assertEquals(ns("0042"), getExportedReferees("0002")); + Assert.assertEquals(ns("0005"), getExportedReferees("0003")); + Assert.assertEquals(ns("0999", "0042"), getExportedReferees("0005")); + Assert.assertEquals(ns("0002"), getExportedReferees("0042")); + + } + } + + @Test + public void exportStressTest() { + FluoConfiguration config = new FluoConfiguration(miniFluo.getClientConfiguration()); + config.setLoaderQueueSize(100); + config.setLoaderThreads(20); + + try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { + + loadRandom(fc, 1000, 500); + + miniFluo.waitForObservers(); + + diff(getFluoReferees(fc), getExportedReferees()); + + assertEquals(getFluoReferees(fc), getExportedReferees(), fc); + + loadRandom(fc, 1000, 500); + + miniFluo.waitForObservers(); + + assertEquals(getFluoReferees(fc), getExportedReferees(), fc); + + loadRandom(fc, 1000, 10000); + + miniFluo.waitForObservers(); + + assertEquals(getFluoReferees(fc), getExportedReferees(), fc); + + loadRandom(fc, 1000, 10000); + + miniFluo.waitForObservers(); + + assertEquals(getFluoReferees(fc), getExportedReferees(), fc); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java new file mode 100644 index 0000000..3c210a9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export.it; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import com.google.common.collect.Iterators; +import org.apache.commons.io.FileUtils; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.LoaderExecutor; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.scanner.CellScanner; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.api.mini.MiniFluo; +import org.apache.fluo.recipes.core.export.ExportQueue; +import org.apache.fluo.recipes.core.export.Exporter; +import org.apache.fluo.recipes.core.export.SequencedExport; +import org.apache.fluo.recipes.core.serialization.SimpleSerializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +public class ExportTestBase { + + private static Map> globalExports = new HashMap<>(); + private static int exportCalls = 0; + + protected static Set getExportedReferees(String node) { + synchronized (globalExports) { + Set ret = new HashSet<>(); + + Map referees = globalExports.get(node); + + if (referees == null) { + return ret; + } + + referees.forEach((k, v) -> { + if (!v.deleted) + ret.add(k); + }); + + return ret; + } + } + + protected static Map> getExportedReferees() { + synchronized (globalExports) { + + Map> ret = new HashMap<>(); + + for (String k : globalExports.keySet()) { + Set referees = getExportedReferees(k); + if (referees.size() > 0) { + ret.put(k, referees); + } + } + + return ret; + } + } + + protected static int getNumExportCalls() { + synchronized (globalExports) { + return exportCalls; + } + } + + public static class RefExporter extends Exporter { + + public static final String QUEUE_ID = "req"; + + private void updateExports(String key, long seq, String addedRef, boolean deleted) { + Map referees = globalExports.computeIfAbsent(addedRef, k -> new HashMap<>()); + referees.compute(key, (k, v) -> (v == null || v.seq < seq) ? new RefInfo(seq, deleted) : v); + } + + @Override + protected void processExports(Iterator> exportIterator) { + ArrayList> exportList = new ArrayList<>(); + Iterators.addAll(exportList, exportIterator); + + synchronized (globalExports) { + exportCalls++; + + for (SequencedExport se : exportList) { + for (String addedRef : se.getValue().getAddedRefs()) { + updateExports(se.getKey(), se.getSequence(), addedRef, false); + } + + for (String deletedRef : se.getValue().getDeletedRefs()) { + updateExports(se.getKey(), se.getSequence(), deletedRef, true); + } + } + } + } + } + + protected MiniFluo miniFluo; + + protected int getNumBuckets() { + return 13; + } + + protected Integer getBufferSize() { + return null; + } + + @Before + public void setUpFluo() throws Exception { + FileUtils.deleteQuietly(new File("target/mini")); + + FluoConfiguration props = new FluoConfiguration(); + props.setApplicationName("eqt"); + props.setWorkerThreads(20); + props.setMiniDataDir("target/mini"); + + ObserverSpecification doc = new ObserverSpecification(DocumentObserver.class.getName()); + props.addObserver(doc); + + SimpleSerializer.setSerializer(props, GsonSerializer.class); + + ExportQueue.Options exportQueueOpts = + new ExportQueue.Options(RefExporter.QUEUE_ID, RefExporter.class, String.class, + RefUpdates.class, getNumBuckets()); + + if (getBufferSize() != null) { + exportQueueOpts.setBufferSize(getBufferSize()); + } + + ExportQueue.configure(props, exportQueueOpts); + + miniFluo = FluoFactory.newMiniFluo(props); + + globalExports.clear(); + exportCalls = 0; + } + + @After + public void tearDownFluo() throws Exception { + if (miniFluo != null) { + miniFluo.close(); + } + } + + protected static Set ns(String... sa) { + return new HashSet<>(Arrays.asList(sa)); + } + + protected static String nk(int i) { + return String.format("%06d", i); + } + + protected static Set ns(int... ia) { + HashSet ret = new HashSet<>(); + for (int i : ia) { + ret.add(nk(i)); + } + return ret; + } + + public void assertEquals(Map> expected, Map> actual, + FluoClient fc) { + if (!expected.equals(actual)) { + System.out.println("*** diff ***"); + diff(expected, actual); + System.out.println("*** fluo dump ***"); + dump(fc); + System.out.println("*** map dump ***"); + + Assert.fail(); + } + } + + protected void loadRandom(FluoClient fc, int num, int maxDocId) { + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + Random rand = new Random(); + + for (int i = 0; i < num; i++) { + String docid = String.format("%05d", rand.nextInt(maxDocId)); + String[] refs = new String[rand.nextInt(20) + 1]; + for (int j = 0; j < refs.length; j++) { + refs[j] = String.format("%05d", rand.nextInt(maxDocId)); + } + + loader.execute(new DocumentLoader(docid, refs)); + } + } + } + + protected void diff(Map> fr, Map> er) { + HashSet allKeys = new HashSet<>(fr.keySet()); + allKeys.addAll(er.keySet()); + + for (String k : allKeys) { + Set s1 = fr.getOrDefault(k, Collections.emptySet()); + Set s2 = er.getOrDefault(k, Collections.emptySet()); + + HashSet sub1 = new HashSet<>(s1); + sub1.removeAll(s2); + + HashSet sub2 = new HashSet<>(s2); + sub2.removeAll(s1); + + if (sub1.size() > 0 || sub2.size() > 0) { + System.out.println(k + " " + sub1 + " " + sub2); + } + + } + } + + protected Map> getFluoReferees(FluoClient fc) { + Map> fluoReferees = new HashMap<>(); + + try (Snapshot snap = fc.newSnapshot()) { + + Column currCol = new Column("content", "current"); + RowScanner rowScanner = snap.scanner().over(Span.prefix("d:")).fetch(currCol).byRow().build(); + + for (ColumnScanner columnScanner : rowScanner) { + String docid = columnScanner.getsRow().substring(2); + + for (ColumnValue columnValue : columnScanner) { + String[] refs = columnValue.getsValue().split(" "); + + for (String ref : refs) { + if (ref.isEmpty()) + continue; + + fluoReferees.computeIfAbsent(ref, k -> new HashSet<>()).add(docid); + } + } + } + } + return fluoReferees; + } + + public static void dump(FluoClient fc) { + try (Snapshot snap = fc.newSnapshot()) { + CellScanner scanner = snap.scanner().build(); + scanner.forEach(rcv -> System.out.println("row:[" + rcv.getRow() + "] col:[" + + rcv.getColumn() + "] val:[" + rcv.getValue() + "]")); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/GsonSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/GsonSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/GsonSerializer.java new file mode 100644 index 0000000..aa610f7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/GsonSerializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export.it; + +import java.nio.charset.StandardCharsets; + +import com.google.gson.Gson; +import org.apache.fluo.api.config.SimpleConfiguration; +import org.apache.fluo.recipes.core.serialization.SimpleSerializer; + +public class GsonSerializer implements SimpleSerializer { + + private Gson gson = new Gson(); + + @Override + public void init(SimpleConfiguration appConfig) { + + } + + @Override + public byte[] serialize(T obj) { + return gson.toJson(obj).getBytes(StandardCharsets.UTF_8); + } + + @Override + public T deserialize(byte[] serObj, Class clazz) { + return gson.fromJson(new String(serObj, StandardCharsets.UTF_8), clazz); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefInfo.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefInfo.java new file mode 100644 index 0000000..e08104b --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefInfo.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export.it; + +class RefInfo { + long seq; + boolean deleted; + + public RefInfo(long seq, boolean deleted) { + this.seq = seq; + this.deleted = deleted; + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefUpdates.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefUpdates.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefUpdates.java new file mode 100644 index 0000000..cd68049 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/RefUpdates.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export.it; + +import java.util.Set; + +public class RefUpdates { + private Set addedRefs; + private Set deletedRefs; + + public RefUpdates() {} + + public RefUpdates(Set addedRefs, Set deletedRefs) { + this.addedRefs = addedRefs; + this.deletedRefs = deletedRefs; + } + + public Set getAddedRefs() { + return addedRefs; + } + + public Set getDeletedRefs() { + return deletedRefs; + } + + @Override + public String toString() { + return "added:" + addedRefs + " deleted:" + deletedRefs; + } +}