Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3FD83200B56 for ; Sat, 16 Jul 2016 00:08:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3E2E3160A89; Fri, 15 Jul 2016 22:08:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B26E3160A61 for ; Sat, 16 Jul 2016 00:08:05 +0200 (CEST) Received: (qmail 78459 invoked by uid 500); 15 Jul 2016 22:08:05 -0000 Mailing-List: contact commits-help@fluo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@fluo.incubator.apache.org Delivered-To: mailing list commits@fluo.incubator.apache.org Received: (qmail 78450 invoked by uid 99); 15 Jul 2016 22:08:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jul 2016 22:08:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 632FB1A6138 for ; Fri, 15 Jul 2016 22:08:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 33-xidwQiV2P for ; Fri, 15 Jul 2016 22:07:53 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id BD5BB5FAE4 for ; Fri, 15 Jul 2016 22:07:50 +0000 (UTC) Received: (qmail 77270 invoked by uid 99); 15 Jul 2016 22:07:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jul 2016 22:07:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F0A28E0B49; Fri, 15 Jul 2016 22:07:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@fluo.incubator.apache.org Date: Fri, 15 Jul 2016 22:07:55 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/10] incubator-fluo-recipes git commit: Updated package names in core module archived-at: Fri, 15 Jul 2016 22:08:07 -0000 Updated package names in core module * Packages now have 'org.apache.recipes.core.' prefix Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/beea3f96 Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/beea3f96 Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/beea3f96 Branch: refs/heads/master Commit: beea3f96e5f633341d977d327f6aa1a0f12307f3 Parents: f1dce14 Author: Mike Walch Authored: Fri Jul 15 11:31:28 2016 -0400 Committer: Mike Walch Committed: Fri Jul 15 11:46:22 2016 -0400 ---------------------------------------------------------------------- docs/export-queue.md | 2 +- docs/row-hasher.md | 4 +- docs/serialization.md | 2 +- docs/transient.md | 2 +- .../recipes/accumulo/cmds/CompactTransient.java | 4 +- .../recipes/accumulo/cmds/OptimizeTable.java | 2 +- .../accumulo/export/AccumuloExporter.java | 4 +- .../accumulo/export/ReplicationExport.java | 4 +- .../recipes/accumulo/ops/TableOperations.java | 6 +- .../org/apache/fluo/recipes/common/Pirtos.java | 83 --- .../apache/fluo/recipes/common/RowRange.java | 82 --- .../fluo/recipes/common/TransientRegistry.java | 79 --- .../apache/fluo/recipes/core/common/Pirtos.java | 83 +++ .../fluo/recipes/core/common/RowRange.java | 82 +++ .../recipes/core/common/TransientRegistry.java | 79 +++ .../fluo/recipes/core/data/RowHasher.java | 135 ++++ .../apache/fluo/recipes/core/export/Export.java | 38 ++ .../fluo/recipes/core/export/ExportBucket.java | 203 ++++++ .../fluo/recipes/core/export/ExportEntry.java | 22 + .../recipes/core/export/ExportObserver.java | 140 ++++ .../fluo/recipes/core/export/ExportQueue.java | 273 ++++++++ .../fluo/recipes/core/export/Exporter.java | 64 ++ .../recipes/core/export/SequencedExport.java | 29 + .../fluo/recipes/core/impl/BucketUtil.java | 24 + .../fluo/recipes/core/map/CollisionFreeMap.java | 657 +++++++++++++++++++ .../core/map/CollisionFreeMapObserver.java | 53 ++ .../apache/fluo/recipes/core/map/Combiner.java | 31 + .../recipes/core/map/NullUpdateObserver.java | 25 + .../apache/fluo/recipes/core/map/Update.java | 43 ++ .../fluo/recipes/core/map/UpdateObserver.java | 34 + .../core/serialization/SimpleSerializer.java | 56 ++ .../fluo/recipes/core/transaction/LogEntry.java | 114 ++++ .../core/transaction/RecordingTransaction.java | 64 ++ .../transaction/RecordingTransactionBase.java | 250 +++++++ .../fluo/recipes/core/transaction/TxLog.java | 79 +++ .../apache/fluo/recipes/core/types/Encoder.java | 86 +++ .../fluo/recipes/core/types/StringEncoder.java | 86 +++ .../fluo/recipes/core/types/TypeLayer.java | 488 ++++++++++++++ .../fluo/recipes/core/types/TypedLoader.java | 45 ++ .../fluo/recipes/core/types/TypedObserver.java | 46 ++ .../fluo/recipes/core/types/TypedSnapshot.java | 38 ++ .../recipes/core/types/TypedSnapshotBase.java | 555 ++++++++++++++++ .../recipes/core/types/TypedTransaction.java | 46 ++ .../core/types/TypedTransactionBase.java | 278 ++++++++ .../org/apache/fluo/recipes/data/RowHasher.java | 135 ---- .../org/apache/fluo/recipes/export/Export.java | 38 -- .../fluo/recipes/export/ExportBucket.java | 203 ------ .../apache/fluo/recipes/export/ExportEntry.java | 22 - .../fluo/recipes/export/ExportObserver.java | 140 ---- .../apache/fluo/recipes/export/ExportQueue.java | 273 -------- .../apache/fluo/recipes/export/Exporter.java | 64 -- .../fluo/recipes/export/SequencedExport.java | 29 - .../apache/fluo/recipes/impl/BucketUtil.java | 24 - .../fluo/recipes/map/CollisionFreeMap.java | 657 ------------------- .../recipes/map/CollisionFreeMapObserver.java | 53 -- .../org/apache/fluo/recipes/map/Combiner.java | 31 - .../fluo/recipes/map/NullUpdateObserver.java | 25 - .../org/apache/fluo/recipes/map/Update.java | 43 -- .../apache/fluo/recipes/map/UpdateObserver.java | 34 - .../recipes/serialization/SimpleSerializer.java | 56 -- .../fluo/recipes/transaction/LogEntry.java | 114 ---- .../transaction/RecordingTransaction.java | 64 -- .../transaction/RecordingTransactionBase.java | 250 ------- .../apache/fluo/recipes/transaction/TxLog.java | 79 --- .../org/apache/fluo/recipes/types/Encoder.java | 86 --- .../fluo/recipes/types/StringEncoder.java | 86 --- .../apache/fluo/recipes/types/TypeLayer.java | 488 -------------- .../apache/fluo/recipes/types/TypedLoader.java | 45 -- .../fluo/recipes/types/TypedObserver.java | 46 -- .../fluo/recipes/types/TypedSnapshot.java | 38 -- .../fluo/recipes/types/TypedSnapshotBase.java | 555 ---------------- .../fluo/recipes/types/TypedTransaction.java | 46 -- .../recipes/types/TypedTransactionBase.java | 278 -------- .../fluo/recipes/common/TestGrouping.java | 94 --- .../recipes/common/TransientRegistryTest.java | 48 -- .../fluo/recipes/core/common/TestGrouping.java | 92 +++ .../core/common/TransientRegistryTest.java | 48 ++ .../fluo/recipes/core/data/RowHasherTest.java | 62 ++ .../recipes/core/export/DocumentLoader.java | 36 + .../recipes/core/export/DocumentObserver.java | 63 ++ .../recipes/core/export/ExportBufferIT.java | 106 +++ .../fluo/recipes/core/export/ExportQueueIT.java | 114 ++++ .../recipes/core/export/ExportTestBase.java | 286 ++++++++ .../recipes/core/export/GsonSerializer.java | 42 ++ .../fluo/recipes/core/export/OptionsTest.java | 51 ++ .../fluo/recipes/core/export/RefInfo.java | 26 + .../fluo/recipes/core/export/RefUpdates.java | 43 ++ .../fluo/recipes/core/map/BigUpdateIT.java | 214 ++++++ .../recipes/core/map/CollisionFreeMapIT.java | 361 ++++++++++ .../fluo/recipes/core/map/DocumentLoader.java | 35 + .../fluo/recipes/core/map/DocumentObserver.java | 89 +++ .../fluo/recipes/core/map/OptionsTest.java | 51 ++ .../fluo/recipes/core/map/SplitsTest.java | 75 +++ .../fluo/recipes/core/map/TestSerializer.java | 45 ++ .../recipes/core/map/WordCountCombiner.java | 36 + .../recipes/core/map/WordCountObserver.java | 47 ++ .../transaction/RecordingTransactionTest.java | 227 +++++++ .../fluo/recipes/core/types/MockSnapshot.java | 30 + .../recipes/core/types/MockSnapshotBase.java | 202 ++++++ .../recipes/core/types/MockTransaction.java | 36 + .../recipes/core/types/MockTransactionBase.java | 90 +++ .../fluo/recipes/core/types/TypeLayerTest.java | 494 ++++++++++++++ .../apache/fluo/recipes/data/RowHasherTest.java | 62 -- .../fluo/recipes/export/DocumentLoader.java | 36 - .../fluo/recipes/export/DocumentObserver.java | 63 -- .../fluo/recipes/export/ExportBufferIT.java | 106 --- .../fluo/recipes/export/ExportQueueIT.java | 114 ---- .../fluo/recipes/export/ExportTestBase.java | 286 -------- .../fluo/recipes/export/GsonSerializer.java | 42 -- .../apache/fluo/recipes/export/OptionsTest.java | 51 -- .../org/apache/fluo/recipes/export/RefInfo.java | 26 - .../apache/fluo/recipes/export/RefUpdates.java | 43 -- .../apache/fluo/recipes/map/BigUpdateIT.java | 214 ------ .../fluo/recipes/map/CollisionFreeMapIT.java | 361 ---------- .../apache/fluo/recipes/map/DocumentLoader.java | 35 - .../fluo/recipes/map/DocumentObserver.java | 89 --- .../apache/fluo/recipes/map/OptionsTest.java | 51 -- .../org/apache/fluo/recipes/map/SplitsTest.java | 75 --- .../apache/fluo/recipes/map/TestSerializer.java | 45 -- .../fluo/recipes/map/WordCountCombiner.java | 36 - .../fluo/recipes/map/WordCountObserver.java | 47 -- .../transaction/RecordingTransactionTest.java | 227 ------- .../apache/fluo/recipes/types/MockSnapshot.java | 30 - .../fluo/recipes/types/MockSnapshotBase.java | 202 ------ .../fluo/recipes/types/MockTransaction.java | 36 - .../fluo/recipes/types/MockTransactionBase.java | 90 --- .../fluo/recipes/types/TypeLayerTest.java | 494 -------------- .../recipes/kryo/KryoSimplerSerializer.java | 2 +- .../serialization/KryoSimpleSerializerTest.java | 45 ++ .../serialization/KryoSimpleSerializerTest.java | 45 -- .../fluo/recipes/test/AccumuloExportITBase.java | 2 +- .../recipes/test/export/AccumuloExporterIT.java | 2 +- .../test/export/AccumuloReplicatorIT.java | 10 +- 133 files changed, 7315 insertions(+), 7317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/docs/export-queue.md ---------------------------------------------------------------------- diff --git a/docs/export-queue.md b/docs/export-queue.md index b85895f..0120113 100644 --- a/docs/export-queue.md +++ b/docs/export-queue.md @@ -279,7 +279,7 @@ example of write skew mentioned in the Percolater paper. 1. TH1 : tx1.set(`rowA`,`fam1:qual2`, val1) 1. TH2 : tx2.set(`rowB`,`fam1:qual2`, val2) -[1]: ../modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java +[1]: ../modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java [2]: https://en.wikipedia.org/wiki/Serializability [3]: accumulo-export.md http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/docs/row-hasher.md ---------------------------------------------------------------------- diff --git a/docs/row-hasher.md b/docs/row-hasher.md index adb423f..8db8af6 100644 --- a/docs/row-hasher.md +++ b/docs/row-hasher.md @@ -31,8 +31,8 @@ balancing of the prefix. ```java import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.recipes.common.Pirtos; -import org.apache.fluo.recipes.data.RowHasher; +import org.apache.fluo.recipes.core.common.Pirtos; +import org.apache.fluo.recipes.core.data.RowHasher; public class RowHasherExample { http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/docs/serialization.md ---------------------------------------------------------------------- diff --git a/docs/serialization.md b/docs/serialization.md index d7e3b42..9b664a0 100644 --- a/docs/serialization.md +++ b/docs/serialization.md @@ -69,5 +69,5 @@ how to do this. ``` [1]: https://github.com/EsotericSoftware/kryo -[2]: ../modules/core/src/main/java/org/apache/fluo/recipes/serialization/SimpleSerializer.java +[2]: ../modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java [3]: https://github.com/EsotericSoftware/kryo#registration http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/docs/transient.md ---------------------------------------------------------------------- diff --git a/docs/transient.md b/docs/transient.md index be68493..d0ac845 100644 --- a/docs/transient.md +++ b/docs/transient.md @@ -76,5 +76,5 @@ first range takes 20 seconds to compact, then it will be compacted again in 600 seconds. If the second range takes 80 seconds to compact, then it will be compacted again in 800 seconds. -[1]: ../modules/core/src/main/java/org/apache/fluo/recipes/common/TransientRegistry.java +[1]: ../modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java [2]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java index 7ce53c8..54e05bc 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java @@ -27,8 +27,8 @@ import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.recipes.accumulo.ops.TableOperations; -import org.apache.fluo.recipes.common.RowRange; -import org.apache.fluo.recipes.common.TransientRegistry; +import org.apache.fluo.recipes.core.common.RowRange; +import org.apache.fluo.recipes.core.common.TransientRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java index 6b5276b..bf17cfd 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java @@ -19,7 +19,7 @@ import javax.inject.Inject; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.recipes.accumulo.ops.TableOperations; -import org.apache.fluo.recipes.common.Pirtos; +import org.apache.fluo.recipes.core.common.Pirtos; public class OptimizeTable { http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java index cd112eb..1788403 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java @@ -22,8 +22,8 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.api.observer.Observer.Context; -import org.apache.fluo.recipes.export.Exporter; -import org.apache.fluo.recipes.export.SequencedExport; +import org.apache.fluo.recipes.core.export.Exporter; +import org.apache.fluo.recipes.core.export.SequencedExport; /** * An {@link Exporter} that takes {@link AccumuloExport} objects and writes mutations to Accumulo http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java index ec2f4ac..9a3b196 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java @@ -25,8 +25,8 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; -import org.apache.fluo.recipes.transaction.LogEntry; -import org.apache.fluo.recipes.transaction.TxLog; +import org.apache.fluo.recipes.core.transaction.LogEntry; +import org.apache.fluo.recipes.core.transaction.TxLog; /** * An implementation of {@link AccumuloExport} that replicates a Fluo table to Accumulo using a http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java index e521014..62abd7e 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java @@ -28,9 +28,9 @@ import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.recipes.common.Pirtos; -import org.apache.fluo.recipes.common.RowRange; -import org.apache.fluo.recipes.common.TransientRegistry; +import org.apache.fluo.recipes.core.common.Pirtos; +import org.apache.fluo.recipes.core.common.RowRange; +import org.apache.fluo.recipes.core.common.TransientRegistry; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/common/Pirtos.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/common/Pirtos.java b/modules/core/src/main/java/org/apache/fluo/recipes/common/Pirtos.java deleted file mode 100644 index bdb9e60..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/common/Pirtos.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.common; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.SimpleConfiguration; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.recipes.export.ExportQueue; -import org.apache.fluo.recipes.map.CollisionFreeMap; - -/** - * Post initialization recommended table optimizations. - */ - -public class Pirtos { - private List splits = new ArrayList<>(); - private String tabletGroupingRegex = ""; - - public void setSplits(List splits) { - this.splits.clear(); - this.splits.addAll(splits); - } - - /** - * @return A recommended set of splits points to add to a Fluo table after initialization. - */ - public List getSplits() { - return Collections.unmodifiableList(splits); - } - - public void setTabletGroupingRegex(String tgr) { - Objects.requireNonNull(tgr); - this.tabletGroupingRegex = tgr; - } - - public String getTabletGroupingRegex() { - return "(" + tabletGroupingRegex + ").*"; - } - - public void merge(Pirtos other) { - splits.addAll(other.splits); - if (tabletGroupingRegex.length() > 0 && other.tabletGroupingRegex.length() > 0) { - tabletGroupingRegex += "|" + other.tabletGroupingRegex; - } else { - tabletGroupingRegex += other.tabletGroupingRegex; - } - } - - /** - * A utility method to get table optimizations for all configured recipes. - */ - public static Pirtos getConfiguredOptimizations(FluoConfiguration fluoConfig) { - try (FluoClient client = FluoFactory.newClient(fluoConfig)) { - SimpleConfiguration appConfig = client.getAppConfiguration(); - Pirtos pirtos = new Pirtos(); - - pirtos.merge(ExportQueue.getTableOptimizations(appConfig)); - pirtos.merge(CollisionFreeMap.getTableOptimizations(appConfig)); - - return pirtos; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/common/RowRange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/common/RowRange.java b/modules/core/src/main/java/org/apache/fluo/recipes/common/RowRange.java deleted file mode 100644 index b1fbc23..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/common/RowRange.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.common; - -import java.util.Objects; - -import org.apache.fluo.api.data.Bytes; - -public class RowRange { - private final Bytes start; - private final Bytes end; - - public RowRange(Bytes start, Bytes end) { - Objects.requireNonNull(start); - Objects.requireNonNull(end); - this.start = start; - this.end = end; - } - - public Bytes getStart() { - return start; - } - - public Bytes getEnd() { - return end; - } - - @Override - public int hashCode() { - return start.hashCode() + 31 * end.hashCode(); - } - - @Override - public boolean equals(Object o) { - if (o instanceof RowRange) { - RowRange or = (RowRange) o; - return start.equals(or.start) && end.equals(or.end); - } - - return false; - } - - private static void encNonAscii(StringBuilder sb, Bytes bytes) { - if (bytes == null) { - sb.append("null"); - } else { - for (int i = 0; i < bytes.length(); i++) { - byte b = bytes.byteAt(i); - if (b >= 32 && b <= 126 && b != '\\') { - sb.append((char) b); - } else { - sb.append(String.format("\\x%02x", b & 0xff)); - } - } - } - } - - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("("); - encNonAscii(sb, start); - sb.append(", "); - encNonAscii(sb, end); - sb.append("]"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/common/TransientRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/common/TransientRegistry.java b/modules/core/src/main/java/org/apache/fluo/recipes/common/TransientRegistry.java deleted file mode 100644 index 681d783..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/common/TransientRegistry.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.common; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import javax.xml.bind.DatatypeConverter; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.SimpleConfiguration; -import org.apache.fluo.api.data.Bytes; - -/** - * This class offers a standard way to register transient ranges. The project level documentation - * provides a comprehensive overview. - */ - -public class TransientRegistry { - - private SimpleConfiguration appConfig; - private static final String PREFIX = "recipes.transientRange."; - - /** - * @param appConfig Fluo application config. Can be obtained from - * {@link FluoConfiguration#getAppConfiguration()} before initializing fluo when adding - * Transient ranges. After Fluo is initialized, app config can be obtained from - * {@link FluoClient#getAppConfiguration()} or - * {@link org.apache.fluo.api.observer.Observer.Context#getAppConfiguration()} - */ - public TransientRegistry(SimpleConfiguration appConfig) { - this.appConfig = appConfig; - } - - /** - * This method is expected to be called before Fluo is initialized to register transient ranges. - * - */ - public void addTransientRange(String id, RowRange range) { - String start = DatatypeConverter.printHexBinary(range.getStart().toArray()); - String end = DatatypeConverter.printHexBinary(range.getEnd().toArray()); - - appConfig.setProperty(PREFIX + id, start + ":" + end); - } - - /** - * This method is expected to be called after Fluo is initialized to get the ranges that were - * registered before initialization. - */ - public List getTransientRanges() { - List ranges = new ArrayList<>(); - Iterator keys = appConfig.getKeys(PREFIX.substring(0, PREFIX.length() - 1)); - while (keys.hasNext()) { - String key = keys.next(); - String val = appConfig.getString(key); - String[] sa = val.split(":"); - RowRange rowRange = - new RowRange(Bytes.of(DatatypeConverter.parseHexBinary(sa[0])), - Bytes.of(DatatypeConverter.parseHexBinary(sa[1]))); - ranges.add(rowRange); - } - return ranges; - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/common/Pirtos.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/Pirtos.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/Pirtos.java new file mode 100644 index 0000000..5488e29 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/Pirtos.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.common; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.config.SimpleConfiguration; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.recipes.core.export.ExportQueue; +import org.apache.fluo.recipes.core.map.CollisionFreeMap; + +/** + * Post initialization recommended table optimizations. + */ + +public class Pirtos { + private List splits = new ArrayList<>(); + private String tabletGroupingRegex = ""; + + public void setSplits(List splits) { + this.splits.clear(); + this.splits.addAll(splits); + } + + /** + * @return A recommended set of splits points to add to a Fluo table after initialization. + */ + public List getSplits() { + return Collections.unmodifiableList(splits); + } + + public void setTabletGroupingRegex(String tgr) { + Objects.requireNonNull(tgr); + this.tabletGroupingRegex = tgr; + } + + public String getTabletGroupingRegex() { + return "(" + tabletGroupingRegex + ").*"; + } + + public void merge(Pirtos other) { + splits.addAll(other.splits); + if (tabletGroupingRegex.length() > 0 && other.tabletGroupingRegex.length() > 0) { + tabletGroupingRegex += "|" + other.tabletGroupingRegex; + } else { + tabletGroupingRegex += other.tabletGroupingRegex; + } + } + + /** + * A utility method to get table optimizations for all configured recipes. + */ + public static Pirtos getConfiguredOptimizations(FluoConfiguration fluoConfig) { + try (FluoClient client = FluoFactory.newClient(fluoConfig)) { + SimpleConfiguration appConfig = client.getAppConfiguration(); + Pirtos pirtos = new Pirtos(); + + pirtos.merge(ExportQueue.getTableOptimizations(appConfig)); + pirtos.merge(CollisionFreeMap.getTableOptimizations(appConfig)); + + return pirtos; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/common/RowRange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/RowRange.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/RowRange.java new file mode 100644 index 0000000..913357b --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/RowRange.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.common; + +import java.util.Objects; + +import org.apache.fluo.api.data.Bytes; + +public class RowRange { + private final Bytes start; + private final Bytes end; + + public RowRange(Bytes start, Bytes end) { + Objects.requireNonNull(start); + Objects.requireNonNull(end); + this.start = start; + this.end = end; + } + + public Bytes getStart() { + return start; + } + + public Bytes getEnd() { + return end; + } + + @Override + public int hashCode() { + return start.hashCode() + 31 * end.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof RowRange) { + RowRange or = (RowRange) o; + return start.equals(or.start) && end.equals(or.end); + } + + return false; + } + + private static void encNonAscii(StringBuilder sb, Bytes bytes) { + if (bytes == null) { + sb.append("null"); + } else { + for (int i = 0; i < bytes.length(); i++) { + byte b = bytes.byteAt(i); + if (b >= 32 && b <= 126 && b != '\\') { + sb.append((char) b); + } else { + sb.append(String.format("\\x%02x", b & 0xff)); + } + } + } + } + + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("("); + encNonAscii(sb, start); + sb.append(", "); + encNonAscii(sb, end); + sb.append("]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java new file mode 100644 index 0000000..9533a7a --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.common; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import javax.xml.bind.DatatypeConverter; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.config.SimpleConfiguration; +import org.apache.fluo.api.data.Bytes; + +/** + * This class offers a standard way to register transient ranges. The project level documentation + * provides a comprehensive overview. + */ + +public class TransientRegistry { + + private SimpleConfiguration appConfig; + private static final String PREFIX = "recipes.transientRange."; + + /** + * @param appConfig Fluo application config. Can be obtained from + * {@link FluoConfiguration#getAppConfiguration()} before initializing fluo when adding + * Transient ranges. After Fluo is initialized, app config can be obtained from + * {@link FluoClient#getAppConfiguration()} or + * {@link org.apache.fluo.api.observer.Observer.Context#getAppConfiguration()} + */ + public TransientRegistry(SimpleConfiguration appConfig) { + this.appConfig = appConfig; + } + + /** + * This method is expected to be called before Fluo is initialized to register transient ranges. + * + */ + public void addTransientRange(String id, RowRange range) { + String start = DatatypeConverter.printHexBinary(range.getStart().toArray()); + String end = DatatypeConverter.printHexBinary(range.getEnd().toArray()); + + appConfig.setProperty(PREFIX + id, start + ":" + end); + } + + /** + * This method is expected to be called after Fluo is initialized to get the ranges that were + * registered before initialization. + */ + public List getTransientRanges() { + List ranges = new ArrayList<>(); + Iterator keys = appConfig.getKeys(PREFIX.substring(0, PREFIX.length() - 1)); + while (keys.hasNext()) { + String key = keys.next(); + String val = appConfig.getString(key); + String[] sa = val.split(":"); + RowRange rowRange = + new RowRange(Bytes.of(DatatypeConverter.parseHexBinary(sa[0])), + Bytes.of(DatatypeConverter.parseHexBinary(sa[1]))); + ranges.add(rowRange); + } + return ranges; + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java new file mode 100644 index 0000000..e09bed5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.data; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.hash.Hashing; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.BytesBuilder; +import org.apache.fluo.recipes.core.common.Pirtos; + +/** + * This recipe provides code to help add a hash of the row as a prefix of the row. Using this recipe + * rows are structured like the following. + * + *

+ * {@code ::} + * + *

+ * The recipe also provides code the help generate split points and configure balancing of the + * prefix. + * + *

+ * The project documentation has more information. + */ +public class RowHasher { + + private static final int HASH_LEN = 4; + + public Pirtos getTableOptimizations(int numTablets) { + + List splits = new ArrayList<>(numTablets - 1); + + int numSplits = numTablets - 1; + int distance = (((int) Math.pow(Character.MAX_RADIX, HASH_LEN) - 1) / numTablets) + 1; + int split = distance; + for (int i = 0; i < numSplits; i++) { + splits.add(Bytes.of(prefix + + Strings.padStart(Integer.toString(split, Character.MAX_RADIX), HASH_LEN, '0'))); + split += distance; + } + + splits.add(Bytes.of(prefix + "~")); + + + Pirtos pirtos = new Pirtos(); + pirtos.setSplits(splits); + pirtos.setTabletGroupingRegex(Pattern.quote(prefix.toString())); + + return pirtos; + } + + + private Bytes prefix; + + public RowHasher(String prefix) { + this.prefix = Bytes.of(prefix + ":"); + } + + /** + * @return Returns input with prefix and hash of input prepended. + */ + public Bytes addHash(String row) { + return addHash(Bytes.of(row)); + } + + /** + * @return Returns input with prefix and hash of input prepended. + */ + public Bytes addHash(Bytes row) { + BytesBuilder builder = Bytes.newBuilder(prefix.length() + 5 + row.length()); + builder.append(prefix); + builder.append(genHash(row)); + builder.append(":"); + builder.append(row); + return builder.toBytes(); + } + + private boolean hasHash(Bytes row) { + for (int i = prefix.length(); i < prefix.length() + HASH_LEN; i++) { + byte b = row.byteAt(i); + boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9'); + if (!isAlphaNum) { + return false; + } + } + + if (row.byteAt(prefix.length() - 1) != ':' || row.byteAt(prefix.length() + HASH_LEN) != ':') { + return false; + } + + return true; + } + + /** + * @return Returns input with prefix and hash stripped from beginning. + */ + public Bytes removeHash(Bytes row) { + Preconditions.checkArgument(row.length() >= prefix.length() + 5, + "Row is shorter than expected " + row); + Preconditions.checkArgument(row.subSequence(0, prefix.length()).equals(prefix), + "Row does not have expected prefix " + row); + Preconditions.checkArgument(hasHash(row), "Row does not have expected hash " + row); + return row.subSequence(prefix.length() + 5, row.length()); + } + + private static String genHash(Bytes row) { + int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt(); + hash = hash & 0x7fffffff; + // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is + // nice for debugging. + String hashString = + Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0'); + hashString = hashString.substring(hashString.length() - HASH_LEN); + + return hashString; + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Export.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Export.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Export.java new file mode 100644 index 0000000..4f14f65 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Export.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export; + +import java.util.Objects; + +public class Export { + private final K key; + private final V value; + + public Export(K key, V val) { + Objects.requireNonNull(key); + Objects.requireNonNull(val); + this.key = key; + this.value = val; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java new file mode 100644 index 0000000..cf6dfb4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map.Entry; + +import com.google.common.base.Preconditions; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.config.ScannerConfiguration; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.api.iterator.ColumnIterator; +import org.apache.fluo.api.iterator.RowIterator; +import org.apache.fluo.recipes.core.impl.BucketUtil; +import org.apache.fluo.recipes.core.types.StringEncoder; +import org.apache.fluo.recipes.core.types.TypeLayer; +import org.apache.fluo.recipes.core.types.TypedTransactionBase; + +/** + * This class encapsulates a buckets serialization code. + */ +class ExportBucket { + private static final String NOTIFICATION_CF = "fluoRecipes"; + private static final String NOTIFICATION_CQ_PREFIX = "eq:"; + private static final Column EXPORT_COL = new Column("e", "v"); + private static final Column NEXT_COL = new Column("e", "next"); + + static Column newNotificationColumn(String queueId) { + return new Column(NOTIFICATION_CF, NOTIFICATION_CQ_PREFIX + queueId); + } + + private final TypedTransactionBase ttx; + private final String qid; + private final Bytes bucketRow; + + static Bytes generateBucketRow(String qid, int bucket, int numBuckets) { + return Bytes.of(qid + ":" + BucketUtil.genBucketId(bucket, numBuckets)); + } + + ExportBucket(TransactionBase tx, String qid, int bucket, int numBuckets) { + // TODO encode in a more robust way... but for now fail early + Preconditions.checkArgument(!qid.contains(":"), "Export QID can not contain :"); + this.ttx = new TypeLayer(new StringEncoder()).wrap(tx); + this.qid = qid; + this.bucketRow = generateBucketRow(qid, bucket, numBuckets); + } + + ExportBucket(TransactionBase tx, Bytes bucketRow) { + this.ttx = new TypeLayer(new StringEncoder()).wrap(tx); + + int colonLoc = -1; + + for (int i = 0; i < bucketRow.length(); i++) { + if (bucketRow.byteAt(i) == ':') { + colonLoc = i; + break; + } + } + + Preconditions.checkArgument(colonLoc != -1 && colonLoc != bucketRow.length(), + "Invalid bucket row " + bucketRow); + Preconditions.checkArgument(bucketRow.byteAt(bucketRow.length() - 1) == ':', + "Invalid bucket row " + bucketRow); + + this.bucketRow = bucketRow.subSequence(0, bucketRow.length() - 1); + this.qid = bucketRow.subSequence(0, colonLoc).toString(); + } + + private static byte[] encSeq(long l) { + byte[] ret = new byte[8]; + ret[0] = (byte) (l >>> 56); + ret[1] = (byte) (l >>> 48); + ret[2] = (byte) (l >>> 40); + ret[3] = (byte) (l >>> 32); + ret[4] = (byte) (l >>> 24); + ret[5] = (byte) (l >>> 16); + ret[6] = (byte) (l >>> 8); + ret[7] = (byte) (l >>> 0); + return ret; + } + + private static long decodeSeq(Bytes seq) { + return (((long) seq.byteAt(0) << 56) + ((long) (seq.byteAt(1) & 255) << 48) + + ((long) (seq.byteAt(2) & 255) << 40) + ((long) (seq.byteAt(3) & 255) << 32) + + ((long) (seq.byteAt(4) & 255) << 24) + ((seq.byteAt(5) & 255) << 16) + + ((seq.byteAt(6) & 255) << 8) + ((seq.byteAt(7) & 255) << 0)); + } + + + public void add(long seq, byte[] key, byte[] value) { + Bytes row = + Bytes.newBuilder(bucketRow.length() + 1 + key.length + 8).append(bucketRow).append(":") + .append(key).append(encSeq(seq)).toBytes(); + ttx.set(row, EXPORT_COL, Bytes.of(value)); + } + + /** + * Computes the minimial row for a bucket + */ + private Bytes getMinimalRow() { + return Bytes.newBuilder(bucketRow.length() + 1).append(bucketRow).append(":").toBytes(); + } + + public void notifyExportObserver() { + ttx.mutate().row(getMinimalRow()).col(newNotificationColumn(qid)).weaklyNotify(); + } + + public Iterator getExportIterator(Bytes continueRow) { + ScannerConfiguration sc = new ScannerConfiguration(); + + if (continueRow != null) { + Span tmpSpan = Span.prefix(bucketRow); + Span nextSpan = + new Span(new RowColumn(continueRow, EXPORT_COL), true, tmpSpan.getEnd(), + tmpSpan.isEndInclusive()); + sc.setSpan(nextSpan); + } else { + sc.setSpan(Span.prefix(bucketRow)); + } + + sc.fetchColumn(EXPORT_COL.getFamily(), EXPORT_COL.getQualifier()); + RowIterator iter = ttx.get(sc); + + if (iter.hasNext()) { + return new ExportIterator(iter); + } else { + return Collections.emptySet().iterator(); + } + } + + private class ExportIterator implements Iterator { + + private RowIterator rowIter; + private Bytes lastRow; + + public ExportIterator(RowIterator rowIter) { + this.rowIter = rowIter; + } + + @Override + public boolean hasNext() { + return rowIter.hasNext(); + } + + @Override + public ExportEntry next() { + Entry rowCol = rowIter.next(); + Bytes row = rowCol.getKey(); + + Bytes keyBytes = row.subSequence(bucketRow.length() + 1, row.length() - 8); + Bytes seqBytes = row.subSequence(row.length() - 8, row.length()); + + ExportEntry ee = new ExportEntry(); + + ee.key = keyBytes.toArray(); + ee.seq = decodeSeq(seqBytes); + // TODO maybe leave as Bytes? + ee.value = rowCol.getValue().next().getValue().toArray(); + + lastRow = row; + + return ee; + } + + @Override + public void remove() { + ttx.mutate().row(lastRow).col(EXPORT_COL).delete(); + } + } + + public Bytes getContinueRow() { + return ttx.get(getMinimalRow(), NEXT_COL); + } + + public void setContinueRow(ExportEntry ee) { + Bytes nextRow = + Bytes.newBuilder(bucketRow.length() + 1 + ee.key.length + 8).append(bucketRow).append(":") + .append(ee.key).append(encSeq(ee.seq)).toBytes(); + + ttx.set(getMinimalRow(), NEXT_COL, nextRow); + } + + public void clearContinueRow() { + ttx.delete(getMinimalRow(), NEXT_COL); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportEntry.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportEntry.java new file mode 100644 index 0000000..e680204 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportEntry.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export; + +class ExportEntry { + byte[] key; + long seq; + byte[] value; +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java new file mode 100644 index 0000000..940575b --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import com.google.common.collect.Iterators; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.observer.AbstractObserver; +import org.apache.fluo.recipes.core.serialization.SimpleSerializer; + +public class ExportObserver extends AbstractObserver { + + private static class MemLimitIterator implements Iterator { + + private long memConsumed = 0; + private long memLimit; + private int extraPerKey; + private Iterator source; + + public MemLimitIterator(Iterator input, long limit, int extraPerKey) { + this.source = input; + this.memLimit = limit; + this.extraPerKey = extraPerKey; + } + + @Override + public boolean hasNext() { + return memConsumed < memLimit && source.hasNext(); + } + + @Override + public ExportEntry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + ExportEntry ee = source.next(); + memConsumed += ee.key.length + extraPerKey + ee.value.length; + return ee; + } + + @Override + public void remove() { + source.remove(); + } + } + + private String queueId; + private Class keyType; + private Class valType; + SimpleSerializer serializer; + private Exporter exporter; + + private long memLimit; + + protected String getQueueId() { + return queueId; + } + + SimpleSerializer getSerializer() { + return serializer; + } + + @SuppressWarnings("unchecked") + @Override + public void init(Context context) throws Exception { + queueId = context.getParameters().get("queueId"); + ExportQueue.Options opts = new ExportQueue.Options(queueId, context.getAppConfiguration()); + + // TODO defer loading classes... so that not done during fluo init + // TODO move class loading to centralized place... also attempt to check type params + keyType = (Class) getClass().getClassLoader().loadClass(opts.keyType); + valType = (Class) getClass().getClassLoader().loadClass(opts.valueType); + exporter = + getClass().getClassLoader().loadClass(opts.exporterType).asSubclass(Exporter.class) + .newInstance(); + + serializer = SimpleSerializer.getInstance(context.getAppConfiguration()); + + memLimit = opts.getBufferSize(); + + exporter.init(queueId, context); + } + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(ExportBucket.newNotificationColumn(queueId), NotificationType.WEAK); + } + + @Override + public void process(TransactionBase tx, Bytes row, Column column) throws Exception { + ExportBucket bucket = new ExportBucket(tx, row); + + Bytes continueRow = bucket.getContinueRow(); + + Iterator input = bucket.getExportIterator(continueRow); + MemLimitIterator memLimitIter = new MemLimitIterator(input, memLimit, 8 + queueId.length()); + + Iterator> exportIterator = + Iterators.transform( + memLimitIter, + ee -> new SequencedExport<>(serializer.deserialize(ee.key, keyType), serializer + .deserialize(ee.value, valType), ee.seq)); + + exportIterator = Iterators.consumingIterator(exportIterator); + + exporter.processExports(exportIterator); + + if (input.hasNext()) { + // not everything was processed so notify self + bucket.notifyExportObserver(); + + if (!memLimitIter.hasNext()) { + // stopped because of mem limit... set continue key + bucket.setContinueRow(input.next()); + continueRow = null; + } + } + + if (continueRow != null) { + bucket.clearContinueRow(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java new file mode 100644 index 0000000..f013872 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + +import com.google.common.base.Preconditions; +import com.google.common.hash.Hashing; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.SimpleConfiguration; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.recipes.core.common.Pirtos; +import org.apache.fluo.recipes.core.common.RowRange; +import org.apache.fluo.recipes.core.common.TransientRegistry; +import org.apache.fluo.recipes.core.serialization.SimpleSerializer; + +public class ExportQueue { + + private static final String RANGE_BEGIN = "#"; + private static final String RANGE_END = ":~"; + + private int numBuckets; + private SimpleSerializer serializer; + private String queueId; + + // usage hint : could be created once in an observers init method + // usage hint : maybe have a queue for each type of data being exported??? + // maybe less queues are + // more efficient though because more batching at export time?? + ExportQueue(Options opts, SimpleSerializer serializer) throws Exception { + // TODO sanity check key type based on type params + // TODO defer creating classes until needed.. so that its not done during Fluo init + this.queueId = opts.queueId; + this.numBuckets = opts.numBuckets; + this.serializer = serializer; + } + + public void add(TransactionBase tx, K key, V value) { + addAll(tx, Collections.singleton(new Export<>(key, value)).iterator()); + } + + public void addAll(TransactionBase tx, Iterator> exports) { + + Set bucketsNotified = new HashSet<>(); + while (exports.hasNext()) { + Export export = exports.next(); + + byte[] k = serializer.serialize(export.getKey()); + byte[] v = serializer.serialize(export.getValue()); + + int hash = Hashing.murmur3_32().hashBytes(k).asInt(); + int bucketId = Math.abs(hash % numBuckets); + + ExportBucket bucket = new ExportBucket(tx, queueId, bucketId, numBuckets); + bucket.add(tx.getStartTimestamp(), k, v); + + if (!bucketsNotified.contains(bucketId)) { + bucket.notifyExportObserver(); + bucketsNotified.add(bucketId); + } + } + } + + public static ExportQueue getInstance(String exportQueueId, + SimpleConfiguration appConfig) { + Options opts = new Options(exportQueueId, appConfig); + try { + return new ExportQueue<>(opts, SimpleSerializer.getInstance(appConfig)); + } catch (Exception e) { + // TODO + throw new RuntimeException(e); + } + } + + /** + * Call this method before initializing Fluo. + * + * @param fluoConfig The configuration that will be used to initialize fluo. + */ + public static void configure(FluoConfiguration fluoConfig, Options opts) { + SimpleConfiguration appConfig = fluoConfig.getAppConfiguration(); + opts.save(appConfig); + + fluoConfig.addObserver(new ObserverConfiguration(ExportObserver.class.getName()) + .setParameters(Collections.singletonMap("queueId", opts.queueId))); + + Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN); + Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END); + + new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("exportQueue." + + opts.queueId, new RowRange(exportRangeStart, exportRangeStop)); + } + + /** + * Return suggested Fluo table optimizations for all previously configured export queues. + * + * @param appConfig Must pass in the application configuration obtained from + * {@code FluoClient.getAppConfiguration()} or + * {@code FluoConfiguration.getAppConfiguration()} + */ + + public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) { + HashSet queueIds = new HashSet<>(); + appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining( + k -> queueIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0])); + + Pirtos pirtos = new Pirtos(); + queueIds.forEach(qid -> pirtos.merge(getTableOptimizations(qid, appConfig))); + + return pirtos; + } + + /** + * Return suggested Fluo table optimizations for the specified export queue. + * + * @param appConfig Must pass in the application configuration obtained from + * {@code FluoClient.getAppConfiguration()} or + * {@code FluoConfiguration.getAppConfiguration()} + */ + public static Pirtos getTableOptimizations(String queueId, SimpleConfiguration appConfig) { + Options opts = new Options(queueId, appConfig); + + List splits = new ArrayList<>(); + + Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN); + Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END); + + splits.add(exportRangeStart); + splits.add(exportRangeStop); + + List exportSplits = new ArrayList<>(); + for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) { + exportSplits.add(ExportBucket.generateBucketRow(opts.queueId, i, opts.numBuckets)); + } + Collections.sort(exportSplits); + splits.addAll(exportSplits); + + Pirtos pirtos = new Pirtos(); + pirtos.setSplits(splits); + + // the tablet with end row # does not contain any data for the export queue and + // should not be grouped with the export queue + pirtos.setTabletGroupingRegex(Pattern.quote(queueId + ":")); + + return pirtos; + } + + public static class Options { + + private static final String PREFIX = "recipes.exportQueue."; + static final long DEFAULT_BUFFER_SIZE = 1 << 20; + static final int DEFAULT_BUCKETS_PER_TABLET = 10; + + int numBuckets; + Integer bucketsPerTablet = null; + Long bufferSize; + + String keyType; + String valueType; + String exporterType; + String queueId; + + Options(String queueId, SimpleConfiguration appConfig) { + this.queueId = queueId; + + this.numBuckets = appConfig.getInt(PREFIX + queueId + ".buckets"); + this.exporterType = appConfig.getString(PREFIX + queueId + ".exporter"); + this.keyType = appConfig.getString(PREFIX + queueId + ".key"); + this.valueType = appConfig.getString(PREFIX + queueId + ".val"); + this.bufferSize = appConfig.getLong(PREFIX + queueId + ".bufferSize", DEFAULT_BUFFER_SIZE); + this.bucketsPerTablet = + appConfig.getInt(PREFIX + queueId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET); + } + + public Options(String queueId, String exporterType, String keyType, String valueType, + int buckets) { + Preconditions.checkArgument(buckets > 0); + + this.queueId = queueId; + this.numBuckets = buckets; + this.exporterType = exporterType; + this.keyType = keyType; + this.valueType = valueType; + } + + + public Options(String queueId, Class> exporter, + Class keyType, Class valueType, int buckets) { + this(queueId, exporter.getName(), keyType.getName(), valueType.getName(), buckets); + } + + /** + * Sets a limit on the amount of serialized updates to read into memory. Additional memory will + * be used to actually deserialize and process the updates. This limit does not account for + * object overhead in java, which can be significant. + * + *

+ * The way memory read is calculated is by summing the length of serialized key and value byte + * arrays. Once this sum exceeds the configured memory limit, no more export key values are + * processed in the current transaction. When not everything is processed, the observer + * processing exports will notify itself causing another transaction to continue processing + * later. + */ + public Options setBufferSize(long bufferSize) { + Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive"); + this.bufferSize = bufferSize; + return this; + } + + long getBufferSize() { + if (bufferSize == null) { + return DEFAULT_BUFFER_SIZE; + } + + return bufferSize; + } + + /** + * Sets the number of buckets per tablet to generate. This affects how many split points will be + * generated when optimizing the Accumulo table. + * + */ + public Options setBucketsPerTablet(int bucketsPerTablet) { + Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : " + + bucketsPerTablet); + this.bucketsPerTablet = bucketsPerTablet; + return this; + } + + int getBucketsPerTablet() { + if (bucketsPerTablet == null) { + return DEFAULT_BUCKETS_PER_TABLET; + } + + return bucketsPerTablet; + } + + void save(SimpleConfiguration appConfig) { + appConfig.setProperty(PREFIX + queueId + ".buckets", numBuckets + ""); + appConfig.setProperty(PREFIX + queueId + ".exporter", exporterType + ""); + appConfig.setProperty(PREFIX + queueId + ".key", keyType); + appConfig.setProperty(PREFIX + queueId + ".val", valueType); + + if (bufferSize != null) { + appConfig.setProperty(PREFIX + queueId + ".bufferSize", bufferSize); + } + if (bucketsPerTablet != null) { + appConfig.setProperty(PREFIX + queueId + ".bucketsPerTablet", bucketsPerTablet); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java new file mode 100644 index 0000000..529d5f3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export; + +import java.util.Iterator; + +import org.apache.fluo.api.observer.Observer.Context; + +public abstract class Exporter { + + public void init(String queueId, Context observerContext) throws Exception {} + + /** + * Must be able to handle same key being exported multiple times and key being exported out of + * order. The sequence number is meant to help with this. + * + *

+ * If multiple export entries with the same key are passed in, then the entries with the same key + * will be consecutive and in ascending sequence order. + * + *

+ * If the call to process exports is unexpectedly terminated, it will be called again later with + * at least the same data. For example suppose an exporter was passed the following entries. + * + *

    + *
  • key=0 sequence=9 value=abc + *
  • key=1 sequence=13 value=d + *
  • key=1 sequence=17 value=e + *
  • key=1 sequence=23 value=f + *
  • key=2 sequence=19 value=x + *
+ * + *

+ * Assume the exporter exports some of these and then fails before completing all of them. The + * next time its called it will be passed what it saw before, but it could also be passed more. + * + *

    + *
  • key=0 sequence=9 value=abc + *
  • key=1 sequence=13 value=d + *
  • key=1 sequence=17 value=e + *
  • key=1 sequence=23 value=f + *
  • key=1 sequence=29 value=g + *
  • key=2 sequence=19 value=x + *
  • key=2 sequence=77 value=y + *
+ * + */ + protected abstract void processExports(Iterator> exports); + + // TODO add close +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/export/SequencedExport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/SequencedExport.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/SequencedExport.java new file mode 100644 index 0000000..ef1cfe2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/SequencedExport.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.export; + +public class SequencedExport extends Export { + private final long seq; + + SequencedExport(K k, V v, long seq) { + super(k, v); + this.seq = seq; + } + + public long getSequence() { + return seq; + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java new file mode 100644 index 0000000..06bdfa8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/impl/BucketUtil.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.impl; + +public class BucketUtil { + public static String genBucketId(int bucket, int maxBucket) { + int bucketLen = Integer.toHexString(maxBucket).length(); + // TODO printf is slow + return String.format("%0" + bucketLen + "x", bucket); + } +}