Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AC2841895E for ; Fri, 24 Apr 2015 23:20:48 +0000 (UTC) Received: (qmail 31846 invoked by uid 500); 24 Apr 2015 23:20:48 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 31749 invoked by uid 500); 24 Apr 2015 23:20:48 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 31076 invoked by uid 99); 24 Apr 2015 23:20:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Apr 2015 23:20:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13A8AE18F2; Fri, 24 Apr 2015 23:20:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Date: Fri, 24 Apr 2015 23:20:57 -0000 Message-Id: <545dbef838df43daa19a4b415b47f7c2@git.apache.org> In-Reply-To: <11fdb148eaf74d94adaf4c8d004a2058@git.apache.org> References: <11fdb148eaf74d94adaf4c8d004a2058@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/34] accumulo git commit: ACCUMULO-3423 merging master ACCUMULO-3423 merging master Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f1591f03 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f1591f03 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f1591f03 Branch: refs/heads/master Commit: f1591f033ed010f490686b836fe6d6d52cb98238 Parents: f4f28c7 5c670fa Author: Eric C. Newton Authored: Tue Mar 17 08:49:11 2015 -0400 Committer: Eric C. Newton Committed: Tue Mar 17 08:49:11 2015 -0400 ---------------------------------------------------------------------- TESTING.md | 28 +- core/pom.xml | 27 - core/src/main/findbugs/exclude-filter.xml | 46 +- .../accumulo/core/bloomfilter/BloomFilter.java | 3 - .../client/admin/ReplicationOperations.java | 13 +- .../core/client/impl/ClientContext.java | 4 +- .../core/client/impl/ConditionalWriterImpl.java | 9 +- .../client/impl/NamespaceOperationsImpl.java | 8 +- .../client/impl/ReplicationOperationsImpl.java | 125 +- .../core/client/impl/TableOperationsImpl.java | 17 + .../core/client/impl/TabletLocator.java | 1 - .../client/mapreduce/AbstractInputFormat.java | 16 +- .../core/client/replication/ReplicaSystem.java | 50 - .../replication/ReplicaSystemFactory.java | 82 -- .../tokens/CredentialProviderToken.java | 2 +- .../client/security/tokens/DelegationToken.java | 2 +- .../client/security/tokens/KerberosToken.java | 7 +- .../core/client/security/tokens/NullToken.java | 6 +- .../accumulo/core/conf/SiteConfiguration.java | 4 +- .../core/file/blockfile/cache/CachedBlock.java | 14 + .../core/file/blockfile/cache/ClassSize.java | 47 +- .../file/blockfile/cache/LruBlockCache.java | 6 + .../accumulo/core/iterators/OrIterator.java | 12 + .../core/iterators/user/AgeOffFilter.java | 1 - .../core/iterators/user/TimestampFilter.java | 4 +- .../user/WholeColumnFamilyIterator.java | 6 +- .../core/master/thrift/MasterClientService.java | 1213 ++++++++++++++++++ .../replication/PrintReplicationRecords.java | 97 -- .../core/replication/ReplicaSystemHelper.java | 72 -- .../core/replication/ReplicationTable.java | 1 - .../core/replication/StatusFormatter.java | 187 --- .../accumulo/core/replication/StatusUtil.java | 225 ---- .../core/replication/proto/Replication.java | 949 -------------- .../apache/accumulo/core/rpc/ThriftUtil.java | 8 +- .../CachingHDFSSecretKeyEncryptionStrategy.java | 2 +- .../apache/accumulo/core/util/CreateToken.java | 6 +- .../org/apache/accumulo/core/util/Version.java | 6 +- .../core/util/format/DefaultFormatter.java | 35 +- core/src/main/protobuf/replication.proto | 26 - core/src/main/scripts/generate-protobuf.sh | 98 -- core/src/main/thrift/master.thrift | 3 + .../accumulo/core/cli/TestClientOpts.java | 6 +- .../client/impl/TableOperationsHelperTest.java | 4 +- .../mapred/AccumuloFileOutputFormatTest.java | 15 +- .../mapred/AccumuloRowInputFormatTest.java | 8 +- .../mapreduce/AccumuloFileOutputFormatTest.java | 11 +- .../mapreduce/AccumuloRowInputFormatTest.java | 8 +- .../conf/CredentialProviderFactoryShimTest.java | 8 +- .../iterators/aggregation/NumSummationTest.java | 17 +- .../ReplicationOperationsImplTest.java | 409 ------ .../core/replication/StatusUtilTest.java | 57 - .../core/replication/proto/StatusTest.java | 36 - .../core/util/LocalityGroupUtilTest.java | 4 +- .../core/util/format/HexFormatterTest.java | 2 +- .../simple/src/main/findbugs/exclude-filter.xml | 5 + .../simple/client/RandomBatchScanner.java | 3 +- .../examples/simple/client/RowOperations.java | 30 +- .../simple/filedata/ChunkInputFormatTest.java | 16 +- .../simple/filedata/ChunkInputStreamTest.java | 40 +- fate/src/main/findbugs/exclude-filter.xml | 5 + .../apache/accumulo/maven/plugin/StartMojo.java | 4 +- .../impl/MiniAccumuloClusterImpl.java | 28 +- .../MiniAccumuloClusterStartStopTest.java | 6 +- .../minicluster/MiniAccumuloClusterTest.java | 5 +- .../impl/MiniAccumuloClusterImplTest.java | 6 +- pom.xml | 2 +- proxy/src/main/findbugs/exclude-filter.xml | 7 +- .../org/apache/accumulo/proxy/ProxyServer.java | 30 +- server/base/pom.xml | 29 + .../base/src/main/findbugs/exclude-filter.xml | 12 + .../server/constraints/MetadataConstraints.java | 2 +- .../accumulo/server/data/ServerMutation.java | 13 +- .../apache/accumulo/server/fs/VolumeUtil.java | 4 +- .../apache/accumulo/server/init/Initialize.java | 3 +- .../iterators/MetadataBulkLoadFilter.java | 2 +- .../accumulo/server/master/LiveTServerSet.java | 25 +- .../master/balancer/DefaultLoadBalancer.java | 11 + .../server/metrics/AbstractMetricsImpl.java | 5 +- .../replication/PrintReplicationRecords.java | 98 ++ .../server/replication/ReplicaSystem.java | 49 + .../replication/ReplicaSystemFactory.java | 82 ++ .../server/replication/ReplicaSystemHelper.java | 74 ++ .../server/replication/ReplicationUtil.java | 6 +- .../server/replication/StatusCombiner.java | 6 +- .../server/replication/StatusFormatter.java | 170 +++ .../accumulo/server/replication/StatusUtil.java | 225 ++++ .../server/replication/proto/Replication.java | 949 ++++++++++++++ .../org/apache/accumulo/server/util/Admin.java | 16 +- .../server/util/ReplicationTableUtil.java | 4 +- server/base/src/main/protobuf/replication.proto | 26 + .../base/src/main/scripts/generate-protobuf.sh | 98 ++ .../master/balancer/GroupBalancerTest.java | 2 +- .../master/balancer/TableLoadBalancerTest.java | 8 +- .../server/replication/StatusCombinerTest.java | 6 +- .../server/replication/StatusUtilTest.java | 54 + .../server/replication/proto/StatusTest.java | 36 + .../server/security/SystemCredentialsTest.java | 8 +- .../accumulo/server/util/FileUtilTest.java | 39 +- .../server/util/ReplicationTableUtilTest.java | 7 +- server/gc/src/main/findbugs/exclude-filter.xml | 5 + .../gc/GarbageCollectWriteAheadLogs.java | 4 +- .../accumulo/gc/GarbageCollectionAlgorithm.java | 4 +- .../gc/GarbageCollectionEnvironment.java | 2 +- .../accumulo/gc/SimpleGarbageCollector.java | 2 +- .../CloseWriteAheadLogReferences.java | 4 +- .../accumulo/gc/GarbageCollectionTest.java | 4 +- .../CloseWriteAheadLogReferencesTest.java | 4 +- .../master/src/main/findbugs/exclude-filter.xml | 20 + .../accumulo/master/FateServiceHandler.java | 2 +- .../java/org/apache/accumulo/master/Master.java | 13 +- .../master/MasterClientServiceHandler.java | 111 +- .../accumulo/master/TabletGroupWatcher.java | 3 +- .../DistributedWorkQueueWorkAssigner.java | 5 +- .../master/replication/FinishedWorkUpdater.java | 4 +- .../RemoveCompleteReplicationRecords.java | 4 +- .../master/replication/StatusMaker.java | 2 +- .../accumulo/master/replication/WorkMaker.java | 4 +- .../accumulo/master/tableOps/BulkImport.java | 7 +- .../accumulo/master/tableOps/ImportTable.java | 39 +- .../accumulo/master/tableOps/TraceRepo.java | 2 +- .../master/ReplicationOperationsImplTest.java | 452 +++++++ .../replication/FinishedWorkUpdaterTest.java | 5 +- .../RemoveCompleteReplicationRecordsTest.java | 7 +- .../replication/SequentialWorkAssignerTest.java | 5 +- .../master/replication/StatusMakerTest.java | 7 +- .../replication/UnorderedWorkAssignerTest.java | 4 +- .../master/replication/WorkMakerTest.java | 7 +- .../accumulo/monitor/ZooKeeperStatus.java | 11 + .../accumulo/monitor/servlets/BasicServlet.java | 6 +- .../accumulo/monitor/servlets/JSONServlet.java | 4 +- .../accumulo/monitor/servlets/LogServlet.java | 2 + .../monitor/servlets/ProblemServlet.java | 17 +- .../monitor/servlets/ReplicationServlet.java | 25 +- .../accumulo/monitor/servlets/ShellServlet.java | 36 +- .../monitor/servlets/TServersServlet.java | 2 + .../servlets/trace/NullKeyValueIterator.java | 3 +- .../servlets/trace/ShowTraceLinkType.java | 2 + .../monitor/servlets/trace/Summary.java | 3 + .../monitor/util/celltypes/CellType.java | 5 +- .../monitor/util/celltypes/CompactionsType.java | 1 + .../monitor/util/celltypes/DateTimeType.java | 1 + .../monitor/util/celltypes/DurationType.java | 1 + .../monitor/util/celltypes/NumberType.java | 1 + .../monitor/util/celltypes/PercentageType.java | 2 + .../util/celltypes/PreciseNumberType.java | 2 + .../util/celltypes/ProgressChartType.java | 1 + .../monitor/util/celltypes/StringType.java | 2 + .../monitor/util/celltypes/TServerLinkType.java | 2 + .../monitor/util/celltypes/TableLinkType.java | 1 + .../monitor/util/celltypes/TableStateType.java | 2 + .../tracer/src/main/findbugs/exclude-filter.xml | 2 +- .../org/apache/accumulo/tracer/TraceServer.java | 2 +- .../src/main/findbugs/exclude-filter.xml | 6 + .../tserver/TabletServerResourceManager.java | 5 + .../compaction/MajorCompactionRequest.java | 5 + .../accumulo/tserver/log/MultiReader.java | 11 + .../tserver/log/TabletServerLogger.java | 4 +- .../tserver/metrics/TabletServerMBeanImpl.java | 2 +- .../metrics/TabletServerMinCMetrics.java | 2 +- .../metrics/TabletServerScanMetrics.java | 2 +- .../metrics/TabletServerUpdateMetrics.java | 2 +- .../replication/AccumuloReplicaSystem.java | 11 +- .../replication/ReplicationProcessor.java | 10 +- .../tserver/tablet/CompactionRunner.java | 12 + .../tserver/tablet/DatafileManager.java | 2 +- .../apache/accumulo/tserver/tablet/Tablet.java | 4 +- .../tserver/log/SortedLogRecoveryTest.java | 11 + .../tserver/log/TestUpgradePathForWALogs.java | 6 +- .../replication/AccumuloReplicaSystemTest.java | 5 +- .../replication/ReplicationProcessorTest.java | 9 +- .../accumulo/tserver/tablet/RootFilesTest.java | 8 +- .../accumulo/tserver/tablet/TabletTest.java | 8 +- shell/src/main/findbugs/exclude-filter.xml | 5 + .../shell/commands/ExtensionCommand.java | 5 +- .../apache/accumulo/shell/ShellConfigTest.java | 7 +- .../org/apache/accumulo/shell/ShellTest.java | 8 +- .../AccumuloReloadingVFSClassLoaderTest.java | 8 +- test/src/main/findbugs/exclude-filter.xml | 25 +- .../apache/accumulo/test/TestRandomDeletes.java | 13 + .../accumulo/test/continuous/Histogram.java | 14 + .../test/functional/CacheTestClean.java | 7 +- .../test/functional/CacheTestReader.java | 6 +- .../accumulo/test/randomwalk/bulk/Verify.java | 2 +- .../randomwalk/security/WalkingSecurity.java | 4 +- .../test/replication/MockReplicaSystem.java | 6 +- .../ReplicationTablesPrinterThread.java | 2 +- .../accumulo/fate/zookeeper/ZooLockTest.java | 2 +- .../org/apache/accumulo/harness/AccumuloIT.java | 26 +- .../accumulo/harness/MiniClusterHarness.java | 9 +- .../accumulo/harness/SharedMiniClusterIT.java | 4 +- .../org/apache/accumulo/harness/TestingKdc.java | 7 +- .../StandaloneAccumuloClusterConfiguration.java | 2 +- .../apache/accumulo/proxy/SimpleProxyBase.java | 19 +- .../apache/accumulo/test/AuditMessageIT.java | 7 +- .../accumulo/test/BulkImportVolumeIT.java | 4 +- .../accumulo/test/ConditionalWriterIT.java | 8 +- .../org/apache/accumulo/test/ExistingMacIT.java | 6 +- .../MissingWalHeaderCompletesRecoveryIT.java | 6 +- .../org/apache/accumulo/test/NamespacesIT.java | 4 +- .../test/RewriteTabletDirectoriesIT.java | 4 +- .../org/apache/accumulo/test/ShellServerIT.java | 4 +- .../accumulo/test/UserCompactionStrategyIT.java | 5 + .../apache/accumulo/test/VolumeChooserIT.java | 6 +- .../java/org/apache/accumulo/test/VolumeIT.java | 8 +- .../test/functional/ConfigurableMacIT.java | 9 +- .../accumulo/test/functional/DeleteIT.java | 15 +- .../functional/DeleteTableDuringSplitIT.java | 4 +- .../test/functional/HalfDeadTServerIT.java | 6 +- .../accumulo/test/functional/KerberosIT.java | 16 +- .../test/functional/MonitorLoggingIT.java | 2 +- .../accumulo/test/functional/MonitorSslIT.java | 4 +- .../accumulo/test/functional/PermissionsIT.java | 10 +- .../accumulo/test/functional/ScanIdIT.java | 9 +- .../apache/accumulo/test/functional/SslIT.java | 2 +- .../accumulo/test/functional/ZooCacheIT.java | 6 +- .../test/replication/CyclicReplicationIT.java | 10 +- ...bageCollectorCommunicatesWithTServersIT.java | 2 +- .../replication/MultiInstanceReplicationIT.java | 6 +- .../test/replication/ReplicationIT.java | 8 +- .../test/replication/StatusCombinerMacIT.java | 4 +- .../UnorderedWorkAssignerReplicationIT.java | 6 +- ...UnusedWalDoesntCloseReplicationStatusIT.java | 11 +- .../test/security/KerberosTokenTest.java | 12 +- .../apache/accumulo/test/util/CertUtils.java | 20 +- .../accumulo/test/util/CertUtilsTest.java | 32 +- trace/src/main/findbugs/exclude-filter.xml | 5 +- 226 files changed, 4736 insertions(+), 3008 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java index 0000000,898e3d4..e973ebc mode 000000,100644..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java @@@ -1,0 -1,216 +1,225 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.server.replication; + + import org.apache.accumulo.core.data.Value; + import org.apache.accumulo.core.protobuf.ProtobufUtil; + import org.apache.accumulo.server.replication.proto.Replication.Status; + import org.apache.accumulo.server.replication.proto.Replication.Status.Builder; + + import com.google.protobuf.InvalidProtocolBufferException; + + /** + * Helper methods to create Status protobuf messages + */ + public class StatusUtil { + + private static final Status INF_END_REPLICATION_STATUS, CLOSED_STATUS; + private static final Value INF_END_REPLICATION_STATUS_VALUE, CLOSED_STATUS_VALUE; + + private static final Status.Builder CREATED_STATUS_BUILDER; ++ private static final Status.Builder INF_END_REPLICATION_STATUS_BUILDER; + + static { + CREATED_STATUS_BUILDER = Status.newBuilder(); + CREATED_STATUS_BUILDER.setBegin(0); + CREATED_STATUS_BUILDER.setEnd(0); + CREATED_STATUS_BUILDER.setInfiniteEnd(false); + CREATED_STATUS_BUILDER.setClosed(false); + + Builder builder = Status.newBuilder(); + builder.setBegin(0); + builder.setEnd(0); + builder.setInfiniteEnd(true); + builder.setClosed(false); ++ INF_END_REPLICATION_STATUS_BUILDER = builder; + INF_END_REPLICATION_STATUS = builder.build(); + INF_END_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(INF_END_REPLICATION_STATUS); + + builder = Status.newBuilder(); + builder.setBegin(0); + builder.setEnd(0); + builder.setInfiniteEnd(true); + builder.setClosed(true); + CLOSED_STATUS = builder.build(); + CLOSED_STATUS_VALUE = ProtobufUtil.toValue(CLOSED_STATUS); + } + + /** + * Creates a {@link Status} for newly-created data that must be replicated + * + * @param recordsIngested + * Offset of records which need to be replicated + * @return A {@link Status} tracking data that must be replicated + */ + public static Status ingestedUntil(long recordsIngested) { + return ingestedUntil(Status.newBuilder(), recordsIngested); + } + + public static Status ingestedUntil(Builder builder, long recordsIngested) { + return replicatedAndIngested(builder, 0, recordsIngested); + } + + /** + * @param recordsReplicated + * Offset of records which have been replicated + * @return A {@link Status} tracking data that must be replicated + */ + public static Status replicated(long recordsReplicated) { + return replicated(Status.newBuilder(), recordsReplicated); + } + + /** + * @param builder + * Existing {@link Builder} to use + * @param recordsReplicated + * Offset of records which have been replicated + * @returnA {@link Status} tracking data that must be replicated + */ + public static Status replicated(Status.Builder builder, long recordsReplicated) { + return replicatedAndIngested(builder, recordsReplicated, 0); + } + + /** + * Creates a @{link Status} for a file which has new data and data which has been replicated + * + * @param recordsReplicated + * Offset of records which have been replicated + * @param recordsIngested + * Offset for records which need to be replicated + * @return A {@link Status} for the given parameters + */ + public static Status replicatedAndIngested(long recordsReplicated, long recordsIngested) { + return replicatedAndIngested(Status.newBuilder(), recordsReplicated, recordsIngested); + } + + /** + * Same as {@link #replicatedAndIngested(long, long)} but uses the provided {@link Builder} + * + * @param builder + * An existing builder + * @param recordsReplicated + * Offset of records which have been replicated + * @param recordsIngested + * Offset of records which need to be replicated + * @return A {@link Status} for the given parameters using the builder + */ + public static Status replicatedAndIngested(Status.Builder builder, long recordsReplicated, long recordsIngested) { + return builder.setBegin(recordsReplicated).setEnd(recordsIngested).setClosed(false).setInfiniteEnd(false).build(); + } + + /** + * @return A {@link Status} for a new file that was just created + */ + public static synchronized Status fileCreated(long timeCreated) { + // We're using a shared builder, so we need to synchronize access on it until we make a Status (which is then immutable) + CREATED_STATUS_BUILDER.setCreatedTime(timeCreated); + return CREATED_STATUS_BUILDER.build(); + } + + /** + * @return A {@link Value} for a new file that was just created + */ + public static Value fileCreatedValue(long timeCreated) { + return ProtobufUtil.toValue(fileCreated(timeCreated)); + } + + /** + * @return A Status representing a closed file + */ + public static Status fileClosed() { + return CLOSED_STATUS; + } + + /** + * @return A Value representing a closed file + */ + public static Value fileClosedValue() { + return CLOSED_STATUS_VALUE; + } + + /** + * @return A {@link Status} for an open file of unspecified length, all of which needs replicating. + */ ++ public static synchronized Status openWithUnknownLength(long timeCreated) { ++ return INF_END_REPLICATION_STATUS_BUILDER.setCreatedTime(timeCreated).build(); ++ } ++ ++ /** ++ * @return A {@link Status} for an open file of unspecified length, all of which needs replicating. ++ */ + public static Status openWithUnknownLength() { + return INF_END_REPLICATION_STATUS; + } + + /** + * @return A {@link Value} for an open file of unspecified length, all of which needs replicating. + */ + public static Value openWithUnknownLengthValue() { + return INF_END_REPLICATION_STATUS_VALUE; + } + + /** + * @param v + * Value with serialized Status + * @return A Status created from the Value + */ + public static Status fromValue(Value v) throws InvalidProtocolBufferException { + return Status.parseFrom(v.get()); + } + + /** + * Is the given Status fully replicated and is its file ready for deletion on the source + * + * @param status + * a Status protobuf + * @return True if the file this Status references can be deleted. + */ + public static boolean isSafeForRemoval(Status status) { + return status.getClosed() && isFullyReplicated(status); + } + + /** + * Is the given Status fully replicated but potentially not yet safe for deletion + * + * @param status + * a Status protobuf + * @return True if the file this Status references is fully replicated so far + */ + public static boolean isFullyReplicated(Status status) { + if (status.getInfiniteEnd()) { + return Long.MAX_VALUE == status.getBegin(); + } else { + return status.getBegin() >= status.getEnd(); + } + } + + /** + * Given the {@link Status}, is there replication work to be done + * + * @param status + * Status for a file + * @return true if replication work is required + */ + public static boolean isWorkRequired(Status status) { + if (status.getInfiniteEnd()) { + return Long.MAX_VALUE != status.getBegin(); + } else { + return status.getBegin() < status.getEnd(); + } + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index f44a9d1,1735c0d..cf068ed --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@@ -47,25 -46,26 +47,25 @@@ import org.apache.accumulo.core.protobu import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTableOfflineException; - import org.apache.accumulo.core.replication.StatusUtil; - import org.apache.accumulo.core.replication.proto.Replication.Status; -import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; -import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; -import org.apache.accumulo.core.trace.Tracer; -import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.AccumuloServerContext; -import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.master.LiveTServerSet; +import org.apache.accumulo.server.master.LiveTServerSet.Listener; +import org.apache.accumulo.server.master.state.MetaDataStateStore; +import org.apache.accumulo.server.master.state.RootTabletStateStore; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletState; + import org.apache.accumulo.server.replication.StatusUtil; + import org.apache.accumulo.server.replication.proto.Replication.Status; -import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.thrift.TException; +import org.apache.hadoop.io.Text; +import org.apache.htrace.Span; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --cc server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index 8185f23,3a32727..6686cb8 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@@ -37,13 -37,13 +37,11 @@@ import org.apache.accumulo.core.file.rf import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.replication.ReplicationTable; - import org.apache.accumulo.core.replication.StatusUtil; - import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java ---------------------------------------------------------------------- diff --cc server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java index f47f14b,23db83a..9fcfec9 --- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java @@@ -54,9 -54,10 +54,7 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationTable; - import org.apache.accumulo.core.replication.StatusUtil; - import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.conf.ServerConfigurationFactory; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java index 4a37f86,ad0598d..f897162 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@@ -421,12 -422,9 +422,12 @@@ public class Master extends AccumuloSer perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ); } perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE); + + // add the currlog location for root tablet current logs + zoo.putPersistentData(ZooUtil.getRoot(getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0], NodeExistsPolicy.SKIP); haveUpgradedZooKeeper = true; } catch (Exception ex) { - log.fatal("Error performing upgrade", ex); + log.error("Error performing upgrade", ex); System.exit(1); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java ---------------------------------------------------------------------- diff --cc server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java index 0000000,a127dcd..1d8eeec mode 000000,100644..100644 --- a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java @@@ -1,0 -1,459 +1,452 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.master; + -import java.util.Arrays; + import java.util.Map.Entry; + import java.util.Set; + import java.util.UUID; + import java.util.concurrent.atomic.AtomicBoolean; + + import org.apache.accumulo.core.client.AccumuloException; + import org.apache.accumulo.core.client.AccumuloSecurityException; + import org.apache.accumulo.core.client.BatchWriter; + import org.apache.accumulo.core.client.BatchWriterConfig; + import org.apache.accumulo.core.client.ClientConfiguration; + import org.apache.accumulo.core.client.Connector; + import org.apache.accumulo.core.client.Instance; + import org.apache.accumulo.core.client.TableNotFoundException; + import org.apache.accumulo.core.client.impl.ClientContext; + import org.apache.accumulo.core.client.impl.ReplicationOperationsImpl; + import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; + import org.apache.accumulo.core.client.mock.MockInstance; + import org.apache.accumulo.core.client.security.tokens.PasswordToken; + import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.data.KeyExtent; + import org.apache.accumulo.core.data.Mutation; + import org.apache.accumulo.core.data.Value; + import org.apache.accumulo.core.metadata.MetadataTable; + import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; + import org.apache.accumulo.core.protobuf.ProtobufUtil; + import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; + import org.apache.accumulo.core.replication.ReplicationTable; + import org.apache.accumulo.core.security.Authorizations; + import org.apache.accumulo.core.security.Credentials; + import org.apache.accumulo.core.security.thrift.TCredentials; + import org.apache.accumulo.core.tabletserver.log.LogEntry; + import org.apache.accumulo.core.trace.thrift.TInfo; + import org.apache.accumulo.server.replication.proto.Replication.Status; + import org.apache.hadoop.io.Text; + import org.apache.thrift.TException; + import org.easymock.EasyMock; + import org.junit.Assert; + import org.junit.Before; + import org.junit.Rule; + import org.junit.Test; + import org.junit.rules.TestName; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class ReplicationOperationsImplTest { + private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImplTest.class); + + private MockInstance inst; + + @Rule + public TestName test = new TestName(); + + @Before + public void setup() { + inst = new MockInstance(test.getMethodName()); + } + + /** + * Spoof out the Master so we can call the implementation without starting a full instance. + */ + private ReplicationOperationsImpl getReplicationOperations(ClientContext context) throws Exception { + Master master = EasyMock.createMock(Master.class); + EasyMock.expect(master.getConnector()).andReturn(inst.getConnector("root", new PasswordToken(""))).anyTimes(); + EasyMock.expect(master.getInstance()).andReturn(inst).anyTimes(); + EasyMock.replay(master); + + final MasterClientServiceHandler mcsh = new MasterClientServiceHandler(master) { + @Override + protected String getTableId(Instance inst, String tableName) throws ThriftTableOperationException { + try { + return inst.getConnector("root", new PasswordToken("")).tableOperations().tableIdMap().get(tableName); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + return new ReplicationOperationsImpl(context) { + @Override + protected boolean getMasterDrain(final TInfo tinfo, final TCredentials rpcCreds, final String tableName, final Set wals) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + try { + return mcsh.drainReplicationTable(tinfo, rpcCreds, tableName, wals); + } catch (TException e) { + throw new RuntimeException(e); + } + } + }; + } + + @Test + public void waitsUntilEntriesAreReplicated() throws Exception { + Connector conn = inst.getConnector("root", new PasswordToken("")); + conn.tableOperations().create("foo"); + Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo")); + + String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); + + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + + Mutation m = new Mutation(file1); + StatusSection.add(m, tableId, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + m = new Mutation(file2); + StatusSection.add(m, tableId, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + bw.close(); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat)); + + bw.addMutation(m); + + m = new Mutation(ReplicationSection.getRowPrefix() + file2); + m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat)); + + bw.close(); + + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean exception = new AtomicBoolean(false); + ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration()); + final ReplicationOperationsImpl roi = getReplicationOperations(context); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + roi.drain("foo"); + } catch (Exception e) { + log.error("Got error", e); + exception.set(true); + } + done.set(true); + } + }); + + t.start(); + + // With the records, we shouldn't be drained + Assert.assertFalse(done.get()); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.putDelete(ReplicationSection.COLF, tableId); + bw.addMutation(m); + bw.flush(); + + Assert.assertFalse(done.get()); + + m = new Mutation(ReplicationSection.getRowPrefix() + file2); + m.putDelete(ReplicationSection.COLF, tableId); + bw.addMutation(m); + bw.flush(); + bw.close(); + + // Removing metadata entries doesn't change anything + Assert.assertFalse(done.get()); + + // Remove the replication entries too + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(file1); + m.putDelete(StatusSection.NAME, tableId); + bw.addMutation(m); + bw.flush(); + + Assert.assertFalse(done.get()); + + m = new Mutation(file2); + m.putDelete(StatusSection.NAME, tableId); + bw.addMutation(m); + bw.flush(); + + try { + t.join(5000); + } catch (InterruptedException e) { + Assert.fail("ReplicationOperations.drain did not complete"); + } + + // After both metadata and replication + Assert.assertTrue("Drain never finished", done.get()); + Assert.assertFalse("Saw unexpectetd exception", exception.get()); + } + + @Test + public void unrelatedReplicationRecordsDontBlockDrain() throws Exception { + Connector conn = inst.getConnector("root", new PasswordToken("")); + conn.tableOperations().create("foo"); + conn.tableOperations().create("bar"); + + Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo")); + Text tableId2 = new Text(conn.tableOperations().tableIdMap().get("bar")); + + String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); + + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + + Mutation m = new Mutation(file1); + StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + m = new Mutation(file2); + StatusSection.add(m, tableId2, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + bw.close(); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat)); + + bw.addMutation(m); + + m = new Mutation(ReplicationSection.getRowPrefix() + file2); + m.put(ReplicationSection.COLF, tableId2, ProtobufUtil.toValue(stat)); + + bw.close(); + + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean exception = new AtomicBoolean(false); + ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration()); + + final ReplicationOperationsImpl roi = getReplicationOperations(context); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + roi.drain("foo"); + } catch (Exception e) { + log.error("Got error", e); + exception.set(true); + } + done.set(true); + } + }); + + t.start(); + + // With the records, we shouldn't be drained + Assert.assertFalse(done.get()); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.putDelete(ReplicationSection.COLF, tableId1); + bw.addMutation(m); + bw.flush(); + + // Removing metadata entries doesn't change anything + Assert.assertFalse(done.get()); + + // Remove the replication entries too + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(file1); + m.putDelete(StatusSection.NAME, tableId1); + bw.addMutation(m); + bw.flush(); + + try { + t.join(5000); + } catch (InterruptedException e) { + Assert.fail("ReplicationOperations.drain did not complete"); + } + + // After both metadata and replication + Assert.assertTrue("Drain never completed", done.get()); + Assert.assertFalse("Saw unexpected exception", exception.get()); + } + + @Test + public void inprogressReplicationRecordsBlockExecution() throws Exception { + Connector conn = inst.getConnector("root", new PasswordToken("")); + conn.tableOperations().create("foo"); + + Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo")); + + String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); + + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + + Mutation m = new Mutation(file1); + StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + bw.close(); + - LogEntry logEntry = new LogEntry(); - logEntry.extent = new KeyExtent(new Text(tableId1), null, null); - logEntry.server = "tserver"; - logEntry.filename = file1; - logEntry.tabletId = 1; - logEntry.logSet = Arrays.asList(file1); - logEntry.timestamp = System.currentTimeMillis(); ++ LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId1), null, null), System.currentTimeMillis(), "tserver", file1); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + m = new Mutation(logEntry.getRow()); + m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue()); + bw.addMutation(m); + + bw.close(); + + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean exception = new AtomicBoolean(false); + ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration()); + final ReplicationOperationsImpl roi = getReplicationOperations(context); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + roi.drain("foo"); + } catch (Exception e) { + log.error("Got error", e); + exception.set(true); + } + done.set(true); + } + }); + + t.start(); + + // With the records, we shouldn't be drained + Assert.assertFalse(done.get()); + + Status newStatus = Status.newBuilder().setBegin(1000).setEnd(2000).setInfiniteEnd(false).setClosed(true).build(); + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(newStatus)); + bw.addMutation(m); + bw.flush(); + + // Removing metadata entries doesn't change anything + Assert.assertFalse(done.get()); + + // Remove the replication entries too + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(file1); + m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus)); + bw.addMutation(m); + bw.flush(); + + try { + t.join(5000); + } catch (InterruptedException e) { + Assert.fail("ReplicationOperations.drain did not complete"); + } + + // New records, but not fully replicated ones don't cause it to complete + Assert.assertFalse("Drain somehow finished", done.get()); + Assert.assertFalse("Saw unexpected exception", exception.get()); + } + + @Test + public void laterCreatedLogsDontBlockExecution() throws Exception { + Connector conn = inst.getConnector("root", new PasswordToken("")); + conn.tableOperations().create("foo"); + + Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo")); + + String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); + + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + Mutation m = new Mutation(file1); + StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + bw.close(); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + bw.close(); + + System.out.println("Reading metadata first time"); + for (Entry e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + System.out.println(e.getKey()); + } + + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean exception = new AtomicBoolean(false); + ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration()); + final ReplicationOperationsImpl roi = getReplicationOperations(context); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + roi.drain("foo"); + } catch (Exception e) { + log.error("Got error", e); + exception.set(true); + } + done.set(true); + } + }); + + t.start(); + + // We need to wait long enough for the table to read once + Thread.sleep(2000); + + // Write another file, but also delete the old files + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + "/accumulo/wals/tserver+port/" + UUID.randomUUID()); + m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.putDelete(ReplicationSection.COLF, tableId1); + bw.addMutation(m); + bw.close(); + + System.out.println("Reading metadata second time"); + for (Entry e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + System.out.println(e.getKey()); + } + + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(file1); + m.putDelete(StatusSection.NAME, tableId1); + bw.addMutation(m); + bw.close(); + + try { + t.join(5000); + } catch (InterruptedException e) { + Assert.fail("ReplicationOperations.drain did not complete"); + } + + // We should pass immediately because we aren't waiting on both files to be deleted (just the one that we did) + Assert.assertTrue("Drain didn't finish", done.get()); + } + + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/tserver/src/main/findbugs/exclude-filter.xml ---------------------------------------------------------------------- diff --cc server/tserver/src/main/findbugs/exclude-filter.xml index 45f6a78,47dd1f5..a334163 --- a/server/tserver/src/main/findbugs/exclude-filter.xml +++ b/server/tserver/src/main/findbugs/exclude-filter.xml @@@ -18,7 -18,13 +18,13 @@@ - + + + + + + + http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 498cbdd,254e5d6..64d2052 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@@ -38,9 -37,6 +38,7 @@@ import org.apache.accumulo.core.data.Ke import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; - import org.apache.accumulo.core.replication.StatusUtil; - import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java index 685d71a,b78a311..27f1f69 --- a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java +++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java @@@ -16,7 -16,10 +16,9 @@@ */ package org.apache.accumulo.test; + import static java.nio.charset.StandardCharsets.UTF_8; + import java.io.File; -import java.util.Collections; import java.util.UUID; import org.apache.accumulo.core.client.BatchWriter; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java index 9af5445,5b89d9c..fbe6900 --- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java @@@ -38,10 -38,8 +38,9 @@@ import org.apache.accumulo.core.data.Va import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationTable; - import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java index 718cda1,54348db..ca58a59 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java @@@ -44,10 -43,8 +44,9 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.admin.TableOperations; - import org.apache.accumulo.core.client.replication.ReplicaSystemFactory; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@@ -78,10 -71,15 +74,14 @@@ import org.apache.accumulo.fate.zookeep import org.apache.accumulo.gc.SimpleGarbageCollector; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.server.master.state.TServerInstance; + import org.apache.accumulo.server.replication.ReplicaSystemFactory; import org.apache.accumulo.server.replication.StatusCombiner; + import org.apache.accumulo.server.replication.StatusFormatter; + import org.apache.accumulo.server.replication.StatusUtil; + import org.apache.accumulo.server.replication.proto.Replication.Status; import org.apache.accumulo.server.util.ReplicationTableUtil; import org.apache.accumulo.test.functional.ConfigurableMacIT; -import org.apache.accumulo.tserver.TabletServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;