accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [11/34] accumulo git commit: ACCUMULO-3423 merging master
Date Fri, 24 Apr 2015 23:20:57 GMT
ACCUMULO-3423 merging master


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f1591f03
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f1591f03
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f1591f03

Branch: refs/heads/master
Commit: f1591f033ed010f490686b836fe6d6d52cb98238
Parents: f4f28c7 5c670fa
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Tue Mar 17 08:49:11 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Tue Mar 17 08:49:11 2015 -0400

----------------------------------------------------------------------
 TESTING.md                                      |   28 +-
 core/pom.xml                                    |   27 -
 core/src/main/findbugs/exclude-filter.xml       |   46 +-
 .../accumulo/core/bloomfilter/BloomFilter.java  |    3 -
 .../client/admin/ReplicationOperations.java     |   13 +-
 .../core/client/impl/ClientContext.java         |    4 +-
 .../core/client/impl/ConditionalWriterImpl.java |    9 +-
 .../client/impl/NamespaceOperationsImpl.java    |    8 +-
 .../client/impl/ReplicationOperationsImpl.java  |  125 +-
 .../core/client/impl/TableOperationsImpl.java   |   17 +
 .../core/client/impl/TabletLocator.java         |    1 -
 .../client/mapreduce/AbstractInputFormat.java   |   16 +-
 .../core/client/replication/ReplicaSystem.java  |   50 -
 .../replication/ReplicaSystemFactory.java       |   82 --
 .../tokens/CredentialProviderToken.java         |    2 +-
 .../client/security/tokens/DelegationToken.java |    2 +-
 .../client/security/tokens/KerberosToken.java   |    7 +-
 .../core/client/security/tokens/NullToken.java  |    6 +-
 .../accumulo/core/conf/SiteConfiguration.java   |    4 +-
 .../core/file/blockfile/cache/CachedBlock.java  |   14 +
 .../core/file/blockfile/cache/ClassSize.java    |   47 +-
 .../file/blockfile/cache/LruBlockCache.java     |    6 +
 .../accumulo/core/iterators/OrIterator.java     |   12 +
 .../core/iterators/user/AgeOffFilter.java       |    1 -
 .../core/iterators/user/TimestampFilter.java    |    4 +-
 .../user/WholeColumnFamilyIterator.java         |    6 +-
 .../core/master/thrift/MasterClientService.java | 1213 ++++++++++++++++++
 .../replication/PrintReplicationRecords.java    |   97 --
 .../core/replication/ReplicaSystemHelper.java   |   72 --
 .../core/replication/ReplicationTable.java      |    1 -
 .../core/replication/StatusFormatter.java       |  187 ---
 .../accumulo/core/replication/StatusUtil.java   |  225 ----
 .../core/replication/proto/Replication.java     |  949 --------------
 .../apache/accumulo/core/rpc/ThriftUtil.java    |    8 +-
 .../CachingHDFSSecretKeyEncryptionStrategy.java |    2 +-
 .../apache/accumulo/core/util/CreateToken.java  |    6 +-
 .../org/apache/accumulo/core/util/Version.java  |    6 +-
 .../core/util/format/DefaultFormatter.java      |   35 +-
 core/src/main/protobuf/replication.proto        |   26 -
 core/src/main/scripts/generate-protobuf.sh      |   98 --
 core/src/main/thrift/master.thrift              |    3 +
 .../accumulo/core/cli/TestClientOpts.java       |    6 +-
 .../client/impl/TableOperationsHelperTest.java  |    4 +-
 .../mapred/AccumuloFileOutputFormatTest.java    |   15 +-
 .../mapred/AccumuloRowInputFormatTest.java      |    8 +-
 .../mapreduce/AccumuloFileOutputFormatTest.java |   11 +-
 .../mapreduce/AccumuloRowInputFormatTest.java   |    8 +-
 .../conf/CredentialProviderFactoryShimTest.java |    8 +-
 .../iterators/aggregation/NumSummationTest.java |   17 +-
 .../ReplicationOperationsImplTest.java          |  409 ------
 .../core/replication/StatusUtilTest.java        |   57 -
 .../core/replication/proto/StatusTest.java      |   36 -
 .../core/util/LocalityGroupUtilTest.java        |    4 +-
 .../core/util/format/HexFormatterTest.java      |    2 +-
 .../simple/src/main/findbugs/exclude-filter.xml |    5 +
 .../simple/client/RandomBatchScanner.java       |    3 +-
 .../examples/simple/client/RowOperations.java   |   30 +-
 .../simple/filedata/ChunkInputFormatTest.java   |   16 +-
 .../simple/filedata/ChunkInputStreamTest.java   |   40 +-
 fate/src/main/findbugs/exclude-filter.xml       |    5 +
 .../apache/accumulo/maven/plugin/StartMojo.java |    4 +-
 .../impl/MiniAccumuloClusterImpl.java           |   28 +-
 .../MiniAccumuloClusterStartStopTest.java       |    6 +-
 .../minicluster/MiniAccumuloClusterTest.java    |    5 +-
 .../impl/MiniAccumuloClusterImplTest.java       |    6 +-
 pom.xml                                         |    2 +-
 proxy/src/main/findbugs/exclude-filter.xml      |    7 +-
 .../org/apache/accumulo/proxy/ProxyServer.java  |   30 +-
 server/base/pom.xml                             |   29 +
 .../base/src/main/findbugs/exclude-filter.xml   |   12 +
 .../server/constraints/MetadataConstraints.java |    2 +-
 .../accumulo/server/data/ServerMutation.java    |   13 +-
 .../apache/accumulo/server/fs/VolumeUtil.java   |    4 +-
 .../apache/accumulo/server/init/Initialize.java |    3 +-
 .../iterators/MetadataBulkLoadFilter.java       |    2 +-
 .../accumulo/server/master/LiveTServerSet.java  |   25 +-
 .../master/balancer/DefaultLoadBalancer.java    |   11 +
 .../server/metrics/AbstractMetricsImpl.java     |    5 +-
 .../replication/PrintReplicationRecords.java    |   98 ++
 .../server/replication/ReplicaSystem.java       |   49 +
 .../replication/ReplicaSystemFactory.java       |   82 ++
 .../server/replication/ReplicaSystemHelper.java |   74 ++
 .../server/replication/ReplicationUtil.java     |    6 +-
 .../server/replication/StatusCombiner.java      |    6 +-
 .../server/replication/StatusFormatter.java     |  170 +++
 .../accumulo/server/replication/StatusUtil.java |  225 ++++
 .../server/replication/proto/Replication.java   |  949 ++++++++++++++
 .../org/apache/accumulo/server/util/Admin.java  |   16 +-
 .../server/util/ReplicationTableUtil.java       |    4 +-
 server/base/src/main/protobuf/replication.proto |   26 +
 .../base/src/main/scripts/generate-protobuf.sh  |   98 ++
 .../master/balancer/GroupBalancerTest.java      |    2 +-
 .../master/balancer/TableLoadBalancerTest.java  |    8 +-
 .../server/replication/StatusCombinerTest.java  |    6 +-
 .../server/replication/StatusUtilTest.java      |   54 +
 .../server/replication/proto/StatusTest.java    |   36 +
 .../server/security/SystemCredentialsTest.java  |    8 +-
 .../accumulo/server/util/FileUtilTest.java      |   39 +-
 .../server/util/ReplicationTableUtilTest.java   |    7 +-
 server/gc/src/main/findbugs/exclude-filter.xml  |    5 +
 .../gc/GarbageCollectWriteAheadLogs.java        |    4 +-
 .../accumulo/gc/GarbageCollectionAlgorithm.java |    4 +-
 .../gc/GarbageCollectionEnvironment.java        |    2 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |    2 +-
 .../CloseWriteAheadLogReferences.java           |    4 +-
 .../accumulo/gc/GarbageCollectionTest.java      |    4 +-
 .../CloseWriteAheadLogReferencesTest.java       |    4 +-
 .../master/src/main/findbugs/exclude-filter.xml |   20 +
 .../accumulo/master/FateServiceHandler.java     |    2 +-
 .../java/org/apache/accumulo/master/Master.java |   13 +-
 .../master/MasterClientServiceHandler.java      |  111 +-
 .../accumulo/master/TabletGroupWatcher.java     |    3 +-
 .../DistributedWorkQueueWorkAssigner.java       |    5 +-
 .../master/replication/FinishedWorkUpdater.java |    4 +-
 .../RemoveCompleteReplicationRecords.java       |    4 +-
 .../master/replication/StatusMaker.java         |    2 +-
 .../accumulo/master/replication/WorkMaker.java  |    4 +-
 .../accumulo/master/tableOps/BulkImport.java    |    7 +-
 .../accumulo/master/tableOps/ImportTable.java   |   39 +-
 .../accumulo/master/tableOps/TraceRepo.java     |    2 +-
 .../master/ReplicationOperationsImplTest.java   |  452 +++++++
 .../replication/FinishedWorkUpdaterTest.java    |    5 +-
 .../RemoveCompleteReplicationRecordsTest.java   |    7 +-
 .../replication/SequentialWorkAssignerTest.java |    5 +-
 .../master/replication/StatusMakerTest.java     |    7 +-
 .../replication/UnorderedWorkAssignerTest.java  |    4 +-
 .../master/replication/WorkMakerTest.java       |    7 +-
 .../accumulo/monitor/ZooKeeperStatus.java       |   11 +
 .../accumulo/monitor/servlets/BasicServlet.java |    6 +-
 .../accumulo/monitor/servlets/JSONServlet.java  |    4 +-
 .../accumulo/monitor/servlets/LogServlet.java   |    2 +
 .../monitor/servlets/ProblemServlet.java        |   17 +-
 .../monitor/servlets/ReplicationServlet.java    |   25 +-
 .../accumulo/monitor/servlets/ShellServlet.java |   36 +-
 .../monitor/servlets/TServersServlet.java       |    2 +
 .../servlets/trace/NullKeyValueIterator.java    |    3 +-
 .../servlets/trace/ShowTraceLinkType.java       |    2 +
 .../monitor/servlets/trace/Summary.java         |    3 +
 .../monitor/util/celltypes/CellType.java        |    5 +-
 .../monitor/util/celltypes/CompactionsType.java |    1 +
 .../monitor/util/celltypes/DateTimeType.java    |    1 +
 .../monitor/util/celltypes/DurationType.java    |    1 +
 .../monitor/util/celltypes/NumberType.java      |    1 +
 .../monitor/util/celltypes/PercentageType.java  |    2 +
 .../util/celltypes/PreciseNumberType.java       |    2 +
 .../util/celltypes/ProgressChartType.java       |    1 +
 .../monitor/util/celltypes/StringType.java      |    2 +
 .../monitor/util/celltypes/TServerLinkType.java |    2 +
 .../monitor/util/celltypes/TableLinkType.java   |    1 +
 .../monitor/util/celltypes/TableStateType.java  |    2 +
 .../tracer/src/main/findbugs/exclude-filter.xml |    2 +-
 .../org/apache/accumulo/tracer/TraceServer.java |    2 +-
 .../src/main/findbugs/exclude-filter.xml        |    6 +
 .../tserver/TabletServerResourceManager.java    |    5 +
 .../compaction/MajorCompactionRequest.java      |    5 +
 .../accumulo/tserver/log/MultiReader.java       |   11 +
 .../tserver/log/TabletServerLogger.java         |    4 +-
 .../tserver/metrics/TabletServerMBeanImpl.java  |    2 +-
 .../metrics/TabletServerMinCMetrics.java        |    2 +-
 .../metrics/TabletServerScanMetrics.java        |    2 +-
 .../metrics/TabletServerUpdateMetrics.java      |    2 +-
 .../replication/AccumuloReplicaSystem.java      |   11 +-
 .../replication/ReplicationProcessor.java       |   10 +-
 .../tserver/tablet/CompactionRunner.java        |   12 +
 .../tserver/tablet/DatafileManager.java         |    2 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |    4 +-
 .../tserver/log/SortedLogRecoveryTest.java      |   11 +
 .../tserver/log/TestUpgradePathForWALogs.java   |    6 +-
 .../replication/AccumuloReplicaSystemTest.java  |    5 +-
 .../replication/ReplicationProcessorTest.java   |    9 +-
 .../accumulo/tserver/tablet/RootFilesTest.java  |    8 +-
 .../accumulo/tserver/tablet/TabletTest.java     |    8 +-
 shell/src/main/findbugs/exclude-filter.xml      |    5 +
 .../shell/commands/ExtensionCommand.java        |    5 +-
 .../apache/accumulo/shell/ShellConfigTest.java  |    7 +-
 .../org/apache/accumulo/shell/ShellTest.java    |    8 +-
 .../AccumuloReloadingVFSClassLoaderTest.java    |    8 +-
 test/src/main/findbugs/exclude-filter.xml       |   25 +-
 .../apache/accumulo/test/TestRandomDeletes.java |   13 +
 .../accumulo/test/continuous/Histogram.java     |   14 +
 .../test/functional/CacheTestClean.java         |    7 +-
 .../test/functional/CacheTestReader.java        |    6 +-
 .../accumulo/test/randomwalk/bulk/Verify.java   |    2 +-
 .../randomwalk/security/WalkingSecurity.java    |    4 +-
 .../test/replication/MockReplicaSystem.java     |    6 +-
 .../ReplicationTablesPrinterThread.java         |    2 +-
 .../accumulo/fate/zookeeper/ZooLockTest.java    |    2 +-
 .../org/apache/accumulo/harness/AccumuloIT.java |   26 +-
 .../accumulo/harness/MiniClusterHarness.java    |    9 +-
 .../accumulo/harness/SharedMiniClusterIT.java   |    4 +-
 .../org/apache/accumulo/harness/TestingKdc.java |    7 +-
 .../StandaloneAccumuloClusterConfiguration.java |    2 +-
 .../apache/accumulo/proxy/SimpleProxyBase.java  |   19 +-
 .../apache/accumulo/test/AuditMessageIT.java    |    7 +-
 .../accumulo/test/BulkImportVolumeIT.java       |    4 +-
 .../accumulo/test/ConditionalWriterIT.java      |    8 +-
 .../org/apache/accumulo/test/ExistingMacIT.java |    6 +-
 .../MissingWalHeaderCompletesRecoveryIT.java    |    6 +-
 .../org/apache/accumulo/test/NamespacesIT.java  |    4 +-
 .../test/RewriteTabletDirectoriesIT.java        |    4 +-
 .../org/apache/accumulo/test/ShellServerIT.java |    4 +-
 .../accumulo/test/UserCompactionStrategyIT.java |    5 +
 .../apache/accumulo/test/VolumeChooserIT.java   |    6 +-
 .../java/org/apache/accumulo/test/VolumeIT.java |    8 +-
 .../test/functional/ConfigurableMacIT.java      |    9 +-
 .../accumulo/test/functional/DeleteIT.java      |   15 +-
 .../functional/DeleteTableDuringSplitIT.java    |    4 +-
 .../test/functional/HalfDeadTServerIT.java      |    6 +-
 .../accumulo/test/functional/KerberosIT.java    |   16 +-
 .../test/functional/MonitorLoggingIT.java       |    2 +-
 .../accumulo/test/functional/MonitorSslIT.java  |    4 +-
 .../accumulo/test/functional/PermissionsIT.java |   10 +-
 .../accumulo/test/functional/ScanIdIT.java      |    9 +-
 .../apache/accumulo/test/functional/SslIT.java  |    2 +-
 .../accumulo/test/functional/ZooCacheIT.java    |    6 +-
 .../test/replication/CyclicReplicationIT.java   |   10 +-
 ...bageCollectorCommunicatesWithTServersIT.java |    2 +-
 .../replication/MultiInstanceReplicationIT.java |    6 +-
 .../test/replication/ReplicationIT.java         |    8 +-
 .../test/replication/StatusCombinerMacIT.java   |    4 +-
 .../UnorderedWorkAssignerReplicationIT.java     |    6 +-
 ...UnusedWalDoesntCloseReplicationStatusIT.java |   11 +-
 .../test/security/KerberosTokenTest.java        |   12 +-
 .../apache/accumulo/test/util/CertUtils.java    |   20 +-
 .../accumulo/test/util/CertUtilsTest.java       |   32 +-
 trace/src/main/findbugs/exclude-filter.xml      |    5 +-
 226 files changed, 4736 insertions(+), 3008 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
index 0000000,898e3d4..e973ebc
mode 000000,100644..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
@@@ -1,0 -1,216 +1,225 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.server.replication;
+ 
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.protobuf.ProtobufUtil;
+ import org.apache.accumulo.server.replication.proto.Replication.Status;
+ import org.apache.accumulo.server.replication.proto.Replication.Status.Builder;
+ 
+ import com.google.protobuf.InvalidProtocolBufferException;
+ 
+ /**
+  * Helper methods to create Status protobuf messages
+  */
+ public class StatusUtil {
+ 
+   private static final Status INF_END_REPLICATION_STATUS, CLOSED_STATUS;
+   private static final Value INF_END_REPLICATION_STATUS_VALUE, CLOSED_STATUS_VALUE;
+ 
+   private static final Status.Builder CREATED_STATUS_BUILDER;
++  private static final Status.Builder INF_END_REPLICATION_STATUS_BUILDER;
+ 
+   static {
+     CREATED_STATUS_BUILDER = Status.newBuilder();
+     CREATED_STATUS_BUILDER.setBegin(0);
+     CREATED_STATUS_BUILDER.setEnd(0);
+     CREATED_STATUS_BUILDER.setInfiniteEnd(false);
+     CREATED_STATUS_BUILDER.setClosed(false);
+ 
+     Builder builder = Status.newBuilder();
+     builder.setBegin(0);
+     builder.setEnd(0);
+     builder.setInfiniteEnd(true);
+     builder.setClosed(false);
++    INF_END_REPLICATION_STATUS_BUILDER = builder;
+     INF_END_REPLICATION_STATUS = builder.build();
+     INF_END_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(INF_END_REPLICATION_STATUS);
+ 
+     builder = Status.newBuilder();
+     builder.setBegin(0);
+     builder.setEnd(0);
+     builder.setInfiniteEnd(true);
+     builder.setClosed(true);
+     CLOSED_STATUS = builder.build();
+     CLOSED_STATUS_VALUE = ProtobufUtil.toValue(CLOSED_STATUS);
+   }
+ 
+   /**
+    * Creates a {@link Status} for newly-created data that must be replicated
+    *
+    * @param recordsIngested
+    *          Offset of records which need to be replicated
+    * @return A {@link Status} tracking data that must be replicated
+    */
+   public static Status ingestedUntil(long recordsIngested) {
+     return ingestedUntil(Status.newBuilder(), recordsIngested);
+   }
+ 
+   public static Status ingestedUntil(Builder builder, long recordsIngested) {
+     return replicatedAndIngested(builder, 0, recordsIngested);
+   }
+ 
+   /**
+    * @param recordsReplicated
+    *          Offset of records which have been replicated
+    * @return A {@link Status} tracking data that must be replicated
+    */
+   public static Status replicated(long recordsReplicated) {
+     return replicated(Status.newBuilder(), recordsReplicated);
+   }
+ 
+   /**
+    * @param builder
+    *          Existing {@link Builder} to use
+    * @param recordsReplicated
+    *          Offset of records which have been replicated
+    * @returnA {@link Status} tracking data that must be replicated
+    */
+   public static Status replicated(Status.Builder builder, long recordsReplicated) {
+     return replicatedAndIngested(builder, recordsReplicated, 0);
+   }
+ 
+   /**
+    * Creates a @{link Status} for a file which has new data and data which has been replicated
+    *
+    * @param recordsReplicated
+    *          Offset of records which have been replicated
+    * @param recordsIngested
+    *          Offset for records which need to be replicated
+    * @return A {@link Status} for the given parameters
+    */
+   public static Status replicatedAndIngested(long recordsReplicated, long recordsIngested) {
+     return replicatedAndIngested(Status.newBuilder(), recordsReplicated, recordsIngested);
+   }
+ 
+   /**
+    * Same as {@link #replicatedAndIngested(long, long)} but uses the provided {@link Builder}
+    *
+    * @param builder
+    *          An existing builder
+    * @param recordsReplicated
+    *          Offset of records which have been replicated
+    * @param recordsIngested
+    *          Offset of records which need to be replicated
+    * @return A {@link Status} for the given parameters using the builder
+    */
+   public static Status replicatedAndIngested(Status.Builder builder, long recordsReplicated, long recordsIngested) {
+     return builder.setBegin(recordsReplicated).setEnd(recordsIngested).setClosed(false).setInfiniteEnd(false).build();
+   }
+ 
+   /**
+    * @return A {@link Status} for a new file that was just created
+    */
+   public static synchronized Status fileCreated(long timeCreated) {
+     // We're using a shared builder, so we need to synchronize access on it until we make a Status (which is then immutable)
+     CREATED_STATUS_BUILDER.setCreatedTime(timeCreated);
+     return CREATED_STATUS_BUILDER.build();
+   }
+ 
+   /**
+    * @return A {@link Value} for a new file that was just created
+    */
+   public static Value fileCreatedValue(long timeCreated) {
+     return ProtobufUtil.toValue(fileCreated(timeCreated));
+   }
+ 
+   /**
+    * @return A Status representing a closed file
+    */
+   public static Status fileClosed() {
+     return CLOSED_STATUS;
+   }
+ 
+   /**
+    * @return A Value representing a closed file
+    */
+   public static Value fileClosedValue() {
+     return CLOSED_STATUS_VALUE;
+   }
+ 
+   /**
+    * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
+    */
++  public static synchronized Status openWithUnknownLength(long timeCreated) {
++    return INF_END_REPLICATION_STATUS_BUILDER.setCreatedTime(timeCreated).build();
++  }
++
++  /**
++   * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
++   */
+   public static Status openWithUnknownLength() {
+     return INF_END_REPLICATION_STATUS;
+   }
+ 
+   /**
+    * @return A {@link Value} for an open file of unspecified length, all of which needs replicating.
+    */
+   public static Value openWithUnknownLengthValue() {
+     return INF_END_REPLICATION_STATUS_VALUE;
+   }
+ 
+   /**
+    * @param v
+    *          Value with serialized Status
+    * @return A Status created from the Value
+    */
+   public static Status fromValue(Value v) throws InvalidProtocolBufferException {
+     return Status.parseFrom(v.get());
+   }
+ 
+   /**
+    * Is the given Status fully replicated and is its file ready for deletion on the source
+    *
+    * @param status
+    *          a Status protobuf
+    * @return True if the file this Status references can be deleted.
+    */
+   public static boolean isSafeForRemoval(Status status) {
+     return status.getClosed() && isFullyReplicated(status);
+   }
+ 
+   /**
+    * Is the given Status fully replicated but potentially not yet safe for deletion
+    *
+    * @param status
+    *          a Status protobuf
+    * @return True if the file this Status references is fully replicated so far
+    */
+   public static boolean isFullyReplicated(Status status) {
+     if (status.getInfiniteEnd()) {
+       return Long.MAX_VALUE == status.getBegin();
+     } else {
+       return status.getBegin() >= status.getEnd();
+     }
+   }
+ 
+   /**
+    * Given the {@link Status}, is there replication work to be done
+    *
+    * @param status
+    *          Status for a file
+    * @return true if replication work is required
+    */
+   public static boolean isWorkRequired(Status status) {
+     if (status.getInfiniteEnd()) {
+       return Long.MAX_VALUE != status.getBegin();
+     } else {
+       return status.getBegin() < status.getEnd();
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index f44a9d1,1735c0d..cf068ed
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -47,25 -46,26 +47,25 @@@ import org.apache.accumulo.core.protobu
  import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
  import org.apache.accumulo.core.replication.ReplicationTable;
  import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
- import org.apache.accumulo.core.replication.StatusUtil;
- import org.apache.accumulo.core.replication.proto.Replication.Status;
 -import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.security.Authorizations;
  import org.apache.accumulo.core.tabletserver.log.LogEntry;
 -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 -import org.apache.accumulo.core.trace.Span;
  import org.apache.accumulo.core.trace.Trace;
 -import org.apache.accumulo.core.trace.Tracer;
 -import org.apache.accumulo.core.util.AddressUtil;
  import org.apache.accumulo.core.zookeeper.ZooUtil;
  import org.apache.accumulo.server.AccumuloServerContext;
 -import org.apache.accumulo.server.ServerConstants;
  import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.master.LiveTServerSet;
 +import org.apache.accumulo.server.master.LiveTServerSet.Listener;
 +import org.apache.accumulo.server.master.state.MetaDataStateStore;
 +import org.apache.accumulo.server.master.state.RootTabletStateStore;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.master.state.TabletState;
+ import org.apache.accumulo.server.replication.StatusUtil;
+ import org.apache.accumulo.server.replication.proto.Replication.Status;
 -import org.apache.accumulo.server.util.MetadataTableUtil;
  import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 -import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.Path;
 -import org.apache.thrift.TException;
 +import org.apache.hadoop.io.Text;
 +import org.apache.htrace.Span;
  import org.apache.zookeeper.KeeperException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 8185f23,3a32727..6686cb8
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@@ -37,13 -37,13 +37,11 @@@ import org.apache.accumulo.core.file.rf
  import org.apache.accumulo.core.master.thrift.MasterClientService;
  import org.apache.accumulo.core.metadata.MetadataTable;
  import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
  import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
  import org.apache.accumulo.core.replication.ReplicationTable;
- import org.apache.accumulo.core.replication.StatusUtil;
- import org.apache.accumulo.core.replication.proto.Replication.Status;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.security.Authorizations;
 -import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
  import org.apache.accumulo.core.trace.Span;
  import org.apache.accumulo.core.trace.Trace;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --cc server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index f47f14b,23db83a..9fcfec9
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@@ -54,9 -54,10 +54,7 @@@ import org.apache.accumulo.core.metadat
  import org.apache.accumulo.core.protobuf.ProtobufUtil;
  import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
  import org.apache.accumulo.core.replication.ReplicationTable;
- import org.apache.accumulo.core.replication.StatusUtil;
- import org.apache.accumulo.core.replication.proto.Replication.Status;
  import org.apache.accumulo.core.security.Authorizations;
 -import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.trace.thrift.TInfo;
  import org.apache.accumulo.server.AccumuloServerContext;
  import org.apache.accumulo.server.conf.ServerConfigurationFactory;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 4a37f86,ad0598d..f897162
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -421,12 -422,9 +422,12 @@@ public class Master extends AccumuloSer
            perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
          }
          perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
 +
 +        // add the currlog location for root tablet current logs
 +        zoo.putPersistentData(ZooUtil.getRoot(getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0], NodeExistsPolicy.SKIP);
          haveUpgradedZooKeeper = true;
        } catch (Exception ex) {
-         log.fatal("Error performing upgrade", ex);
+         log.error("Error performing upgrade", ex);
          System.exit(1);
        }
      }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --cc server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
index 0000000,a127dcd..1d8eeec
mode 000000,100644..100644
--- a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
@@@ -1,0 -1,459 +1,452 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.master;
+ 
 -import java.util.Arrays;
+ import java.util.Map.Entry;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ 
+ import org.apache.accumulo.core.client.AccumuloException;
+ import org.apache.accumulo.core.client.AccumuloSecurityException;
+ import org.apache.accumulo.core.client.BatchWriter;
+ import org.apache.accumulo.core.client.BatchWriterConfig;
+ import org.apache.accumulo.core.client.ClientConfiguration;
+ import org.apache.accumulo.core.client.Connector;
+ import org.apache.accumulo.core.client.Instance;
+ import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.client.impl.ClientContext;
+ import org.apache.accumulo.core.client.impl.ReplicationOperationsImpl;
+ import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+ import org.apache.accumulo.core.client.mock.MockInstance;
+ import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.KeyExtent;
+ import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.metadata.MetadataTable;
+ import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+ import org.apache.accumulo.core.protobuf.ProtobufUtil;
+ import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+ import org.apache.accumulo.core.replication.ReplicationTable;
+ import org.apache.accumulo.core.security.Authorizations;
+ import org.apache.accumulo.core.security.Credentials;
+ import org.apache.accumulo.core.security.thrift.TCredentials;
+ import org.apache.accumulo.core.tabletserver.log.LogEntry;
+ import org.apache.accumulo.core.trace.thrift.TInfo;
+ import org.apache.accumulo.server.replication.proto.Replication.Status;
+ import org.apache.hadoop.io.Text;
+ import org.apache.thrift.TException;
+ import org.easymock.EasyMock;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Rule;
+ import org.junit.Test;
+ import org.junit.rules.TestName;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class ReplicationOperationsImplTest {
+   private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImplTest.class);
+ 
+   private MockInstance inst;
+ 
+   @Rule
+   public TestName test = new TestName();
+ 
+   @Before
+   public void setup() {
+     inst = new MockInstance(test.getMethodName());
+   }
+ 
+   /**
+    * Spoof out the Master so we can call the implementation without starting a full instance.
+    */
+   private ReplicationOperationsImpl getReplicationOperations(ClientContext context) throws Exception {
+     Master master = EasyMock.createMock(Master.class);
+     EasyMock.expect(master.getConnector()).andReturn(inst.getConnector("root", new PasswordToken(""))).anyTimes();
+     EasyMock.expect(master.getInstance()).andReturn(inst).anyTimes();
+     EasyMock.replay(master);
+ 
+     final MasterClientServiceHandler mcsh = new MasterClientServiceHandler(master) {
+       @Override
+       protected String getTableId(Instance inst, String tableName) throws ThriftTableOperationException {
+         try {
+           return inst.getConnector("root", new PasswordToken("")).tableOperations().tableIdMap().get(tableName);
+         } catch (Exception e) {
+           throw new RuntimeException(e);
+         }
+       }
+     };
+ 
+     return new ReplicationOperationsImpl(context) {
+       @Override
+       protected boolean getMasterDrain(final TInfo tinfo, final TCredentials rpcCreds, final String tableName, final Set<String> wals)
+           throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+         try {
+           return mcsh.drainReplicationTable(tinfo, rpcCreds, tableName, wals);
+         } catch (TException e) {
+           throw new RuntimeException(e);
+         }
+       }
+     };
+   }
+ 
+   @Test
+   public void waitsUntilEntriesAreReplicated() throws Exception {
+     Connector conn = inst.getConnector("root", new PasswordToken(""));
+     conn.tableOperations().create("foo");
+     Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo"));
+ 
+     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+     Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+ 
+     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ 
+     Mutation m = new Mutation(file1);
+     StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
+     bw.addMutation(m);
+ 
+     m = new Mutation(file2);
+     StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
+     bw.addMutation(m);
+ 
+     bw.close();
+ 
+     bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+     m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+     m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat));
+ 
+     bw.addMutation(m);
+ 
+     m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+     m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat));
+ 
+     bw.close();
+ 
+     final AtomicBoolean done = new AtomicBoolean(false);
+     final AtomicBoolean exception = new AtomicBoolean(false);
+     ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
+     final ReplicationOperationsImpl roi = getReplicationOperations(context);
+     Thread t = new Thread(new Runnable() {
+       @Override
+       public void run() {
+         try {
+           roi.drain("foo");
+         } catch (Exception e) {
+           log.error("Got error", e);
+           exception.set(true);
+         }
+         done.set(true);
+       }
+     });
+ 
+     t.start();
+ 
+     // With the records, we shouldn't be drained
+     Assert.assertFalse(done.get());
+ 
+     bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+     m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+     m.putDelete(ReplicationSection.COLF, tableId);
+     bw.addMutation(m);
+     bw.flush();
+ 
+     Assert.assertFalse(done.get());
+ 
+     m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+     m.putDelete(ReplicationSection.COLF, tableId);
+     bw.addMutation(m);
+     bw.flush();
+     bw.close();
+ 
+     // Removing metadata entries doesn't change anything
+     Assert.assertFalse(done.get());
+ 
+     // Remove the replication entries too
+     bw = ReplicationTable.getBatchWriter(conn);
+     m = new Mutation(file1);
+     m.putDelete(StatusSection.NAME, tableId);
+     bw.addMutation(m);
+     bw.flush();
+ 
+     Assert.assertFalse(done.get());
+ 
+     m = new Mutation(file2);
+     m.putDelete(StatusSection.NAME, tableId);
+     bw.addMutation(m);
+     bw.flush();
+ 
+     try {
+       t.join(5000);
+     } catch (InterruptedException e) {
+       Assert.fail("ReplicationOperations.drain did not complete");
+     }
+ 
+     // After both metadata and replication
+     Assert.assertTrue("Drain never finished", done.get());
+     Assert.assertFalse("Saw unexpectetd exception", exception.get());
+   }
+ 
+   @Test
+   public void unrelatedReplicationRecordsDontBlockDrain() throws Exception {
+     Connector conn = inst.getConnector("root", new PasswordToken(""));
+     conn.tableOperations().create("foo");
+     conn.tableOperations().create("bar");
+ 
+     Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+     Text tableId2 = new Text(conn.tableOperations().tableIdMap().get("bar"));
+ 
+     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+     Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+ 
+     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ 
+     Mutation m = new Mutation(file1);
+     StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+     bw.addMutation(m);
+ 
+     m = new Mutation(file2);
+     StatusSection.add(m, tableId2, ProtobufUtil.toValue(stat));
+     bw.addMutation(m);
+ 
+     bw.close();
+ 
+     bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+     m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+     m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+ 
+     bw.addMutation(m);
+ 
+     m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+     m.put(ReplicationSection.COLF, tableId2, ProtobufUtil.toValue(stat));
+ 
+     bw.close();
+ 
+     final AtomicBoolean done = new AtomicBoolean(false);
+     final AtomicBoolean exception = new AtomicBoolean(false);
+     ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
+ 
+     final ReplicationOperationsImpl roi = getReplicationOperations(context);
+ 
+     Thread t = new Thread(new Runnable() {
+       @Override
+       public void run() {
+         try {
+           roi.drain("foo");
+         } catch (Exception e) {
+           log.error("Got error", e);
+           exception.set(true);
+         }
+         done.set(true);
+       }
+     });
+ 
+     t.start();
+ 
+     // With the records, we shouldn't be drained
+     Assert.assertFalse(done.get());
+ 
+     bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+     m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+     m.putDelete(ReplicationSection.COLF, tableId1);
+     bw.addMutation(m);
+     bw.flush();
+ 
+     // Removing metadata entries doesn't change anything
+     Assert.assertFalse(done.get());
+ 
+     // Remove the replication entries too
+     bw = ReplicationTable.getBatchWriter(conn);
+     m = new Mutation(file1);
+     m.putDelete(StatusSection.NAME, tableId1);
+     bw.addMutation(m);
+     bw.flush();
+ 
+     try {
+       t.join(5000);
+     } catch (InterruptedException e) {
+       Assert.fail("ReplicationOperations.drain did not complete");
+     }
+ 
+     // After both metadata and replication
+     Assert.assertTrue("Drain never completed", done.get());
+     Assert.assertFalse("Saw unexpected exception", exception.get());
+   }
+ 
+   @Test
+   public void inprogressReplicationRecordsBlockExecution() throws Exception {
+     Connector conn = inst.getConnector("root", new PasswordToken(""));
+     conn.tableOperations().create("foo");
+ 
+     Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+ 
+     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+     Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+ 
+     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ 
+     Mutation m = new Mutation(file1);
+     StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+     bw.addMutation(m);
+     bw.close();
+ 
 -    LogEntry logEntry = new LogEntry();
 -    logEntry.extent = new KeyExtent(new Text(tableId1), null, null);
 -    logEntry.server = "tserver";
 -    logEntry.filename = file1;
 -    logEntry.tabletId = 1;
 -    logEntry.logSet = Arrays.asList(file1);
 -    logEntry.timestamp = System.currentTimeMillis();
++    LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId1), null, null), System.currentTimeMillis(), "tserver", file1);
+ 
+     bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+     m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+     m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+     bw.addMutation(m);
+ 
+     m = new Mutation(logEntry.getRow());
+     m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+     bw.addMutation(m);
+ 
+     bw.close();
+ 
+     final AtomicBoolean done = new AtomicBoolean(false);
+     final AtomicBoolean exception = new AtomicBoolean(false);
+     ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
+     final ReplicationOperationsImpl roi = getReplicationOperations(context);
+     Thread t = new Thread(new Runnable() {
+       @Override
+       public void run() {
+         try {
+           roi.drain("foo");
+         } catch (Exception e) {
+           log.error("Got error", e);
+           exception.set(true);
+         }
+         done.set(true);
+       }
+     });
+ 
+     t.start();
+ 
+     // With the records, we shouldn't be drained
+     Assert.assertFalse(done.get());
+ 
+     Status newStatus = Status.newBuilder().setBegin(1000).setEnd(2000).setInfiniteEnd(false).setClosed(true).build();
+     bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+     m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+     m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(newStatus));
+     bw.addMutation(m);
+     bw.flush();
+ 
+     // Removing metadata entries doesn't change anything
+     Assert.assertFalse(done.get());
+ 
+     // Remove the replication entries too
+     bw = ReplicationTable.getBatchWriter(conn);
+     m = new Mutation(file1);
+     m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus));
+     bw.addMutation(m);
+     bw.flush();
+ 
+     try {
+       t.join(5000);
+     } catch (InterruptedException e) {
+       Assert.fail("ReplicationOperations.drain did not complete");
+     }
+ 
+     // New records, but not fully replicated ones don't cause it to complete
+     Assert.assertFalse("Drain somehow finished", done.get());
+     Assert.assertFalse("Saw unexpected exception", exception.get());
+   }
+ 
+   @Test
+   public void laterCreatedLogsDontBlockExecution() throws Exception {
+     Connector conn = inst.getConnector("root", new PasswordToken(""));
+     conn.tableOperations().create("foo");
+ 
+     Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+ 
+     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+     Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+ 
+     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+     Mutation m = new Mutation(file1);
+     StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+     bw.addMutation(m);
+     bw.close();
+ 
+     bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+     m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+     m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+     bw.addMutation(m);
+ 
+     bw.close();
+ 
+     System.out.println("Reading metadata first time");
+     for (Entry<Key,Value> e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+       System.out.println(e.getKey());
+     }
+ 
+     final AtomicBoolean done = new AtomicBoolean(false);
+     final AtomicBoolean exception = new AtomicBoolean(false);
+     ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
+     final ReplicationOperationsImpl roi = getReplicationOperations(context);
+     Thread t = new Thread(new Runnable() {
+       @Override
+       public void run() {
+         try {
+           roi.drain("foo");
+         } catch (Exception e) {
+           log.error("Got error", e);
+           exception.set(true);
+         }
+         done.set(true);
+       }
+     });
+ 
+     t.start();
+ 
+     // We need to wait long enough for the table to read once
+     Thread.sleep(2000);
+ 
+     // Write another file, but also delete the old files
+     bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+     m = new Mutation(ReplicationSection.getRowPrefix() + "/accumulo/wals/tserver+port/" + UUID.randomUUID());
+     m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+     bw.addMutation(m);
+     m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+     m.putDelete(ReplicationSection.COLF, tableId1);
+     bw.addMutation(m);
+     bw.close();
+ 
+     System.out.println("Reading metadata second time");
+     for (Entry<Key,Value> e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+       System.out.println(e.getKey());
+     }
+ 
+     bw = ReplicationTable.getBatchWriter(conn);
+     m = new Mutation(file1);
+     m.putDelete(StatusSection.NAME, tableId1);
+     bw.addMutation(m);
+     bw.close();
+ 
+     try {
+       t.join(5000);
+     } catch (InterruptedException e) {
+       Assert.fail("ReplicationOperations.drain did not complete");
+     }
+ 
+     // We should pass immediately because we aren't waiting on both files to be deleted (just the one that we did)
+     Assert.assertTrue("Drain didn't finish", done.get());
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/tserver/src/main/findbugs/exclude-filter.xml
----------------------------------------------------------------------
diff --cc server/tserver/src/main/findbugs/exclude-filter.xml
index 45f6a78,47dd1f5..a334163
--- a/server/tserver/src/main/findbugs/exclude-filter.xml
+++ b/server/tserver/src/main/findbugs/exclude-filter.xml
@@@ -18,7 -18,13 +18,13 @@@
    <Match>
      <!-- locking is confusing, but probably correct -->
      <Class name="org.apache.accumulo.tserver.tablet.Tablet" />
 -    <Method name="beginUpdatingLogsUsed" params="org.apache.accumulo.tserver.InMemoryMap,java.util.Collection,boolean" returns="boolean" />
 +    <Method name="beginUpdatingLogsUsed" params="org.apache.accumulo.tserver.InMemoryMap,org.apache.accumulo.tserver.log.DfsLogger,boolean" returns="boolean" />
      <Bug code="UL" pattern="UL_UNRELEASED_LOCK" />
    </Match>
+   <Match>
+     <!-- false positive about forced garbage collection in resource manager -->
+     <Class name="org.apache.accumulo.tserver.TabletServerResourceManager" />
+     <Method name="&lt;init&gt;" params="org.apache.accumulo.tserver.TabletServer,org.apache.accumulo.server.fs.VolumeManager" returns="void" />
+     <Bug code="DM" pattern="DM_GC" />
+   </Match>
  </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 498cbdd,254e5d6..64d2052
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -38,9 -37,6 +38,7 @@@ import org.apache.accumulo.core.data.Ke
  import org.apache.accumulo.core.data.Mutation;
  import org.apache.accumulo.core.protobuf.ProtobufUtil;
  import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
- import org.apache.accumulo.core.replication.StatusUtil;
- import org.apache.accumulo.core.replication.proto.Replication.Status;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
  import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.accumulo.server.conf.TableConfiguration;
  import org.apache.accumulo.server.fs.VolumeManager;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
index 685d71a,b78a311..27f1f69
--- a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@@ -16,7 -16,10 +16,9 @@@
   */
  package org.apache.accumulo.test;
  
+ import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.File;
 -import java.util.Collections;
  import java.util.UUID;
  
  import org.apache.accumulo.core.client.BatchWriter;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 9af5445,5b89d9c..fbe6900
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@@ -38,10 -38,8 +38,9 @@@ import org.apache.accumulo.core.data.Va
  import org.apache.accumulo.core.master.thrift.MasterClientService;
  import org.apache.accumulo.core.metadata.MetadataTable;
  import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
  import org.apache.accumulo.core.protobuf.ProtobufUtil;
  import org.apache.accumulo.core.replication.ReplicationTable;
- import org.apache.accumulo.core.replication.proto.Replication.Status;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.security.Authorizations;
  import org.apache.accumulo.core.security.Credentials;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1591f03/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 718cda1,54348db..ca58a59
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@@ -44,10 -43,8 +44,9 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.TableNotFoundException;
  import org.apache.accumulo.core.client.ZooKeeperInstance;
  import org.apache.accumulo.core.client.admin.TableOperations;
- import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
  import org.apache.accumulo.core.data.Mutation;
  import org.apache.accumulo.core.data.Range;
  import org.apache.accumulo.core.data.Value;
@@@ -78,10 -71,15 +74,14 @@@ import org.apache.accumulo.fate.zookeep
  import org.apache.accumulo.gc.SimpleGarbageCollector;
  import org.apache.accumulo.minicluster.ServerType;
  import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 -import org.apache.accumulo.minicluster.impl.ProcessReference;
 +import org.apache.accumulo.server.master.state.TServerInstance;
+ import org.apache.accumulo.server.replication.ReplicaSystemFactory;
  import org.apache.accumulo.server.replication.StatusCombiner;
+ import org.apache.accumulo.server.replication.StatusFormatter;
+ import org.apache.accumulo.server.replication.StatusUtil;
+ import org.apache.accumulo.server.replication.proto.Replication.Status;
  import org.apache.accumulo.server.util.ReplicationTableUtil;
  import org.apache.accumulo.test.functional.ConfigurableMacIT;
 -import org.apache.accumulo.tserver.TabletServer;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;


Mime
View raw message