accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [23/35] git commit: Merge remote-tracking branch 'origin/master' into ACCUMULO-378
Date Thu, 05 Jun 2014 04:43:05 GMT
Merge remote-tracking branch 'origin/master' into ACCUMULO-378

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e81eee7f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e81eee7f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e81eee7f

Branch: refs/heads/ACCUMULO-378
Commit: e81eee7f7cd2641ffdace5af48a5027f7fcce620
Parents: 73d34ec f280e97
Author: Josh Elser <elserj@apache.org>
Authored: Tue Jun 3 21:38:00 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Tue Jun 3 21:38:00 2014 -0400

----------------------------------------------------------------------
 .../accumulo/core/client/ZooKeeperInstance.java |    7 +-
 .../core/client/impl/ConditionalWriterImpl.java |    4 +-
 .../client/impl/InstanceOperationsImpl.java     |    3 +-
 .../accumulo/core/client/impl/Namespaces.java   |    3 +-
 .../core/client/impl/RootTabletLocator.java     |   14 +-
 .../accumulo/core/client/impl/ServerClient.java |   15 +-
 .../accumulo/core/client/impl/Tables.java       |    3 +-
 .../core/client/impl/ZookeeperLockChecker.java  |   10 +-
 .../org/apache/accumulo/core/conf/Property.java |    8 +-
 .../iterators/conf/ColumnToClassMapping.java    |    1 +
 .../accumulo/core/util/AsyncSocketAppender.java |    3 +-
 .../core/client/ZooKeeperInstanceTest.java      |  144 +
 .../core/client/impl/RootTabletLocatorTest.java |   61 +
 .../client/impl/ZookeeperLockCheckerTest.java   |   58 +
 .../core/util/AsyncSocketAppenderTest.java      |    8 +-
 docs/src/main/asciidoc/chapters/replication.txt |   21 +
 .../accumulo/fate/zookeeper/ZooCache.java       |   16 +-
 .../fate/zookeeper/ZooCacheFactory.java         |   78 +
 .../apache/accumulo/fate/zookeeper/ZooLock.java |    2 +-
 .../fate/zookeeper/ZooReaderWriter.java         |    4 -
 .../fate/zookeeper/ZooCacheFactoryTest.java     |   87 +
 .../accumulo/server/client/HdfsZooInstance.java |    3 +-
 .../server/conf/NamespaceConfiguration.java     |    3 +-
 .../server/conf/TableConfiguration.java         |    3 +-
 .../accumulo/server/conf/ZooConfiguration.java  |    5 +-
 .../accumulo/server/tablets/TabletTime.java     |    1 -
 .../zookeeper/ZooReaderWriterFactory.java       |    2 -
 .../apache/accumulo/server/AccumuloTest.java    |    1 -
 .../server/watcher/MonitorLog4jWatcherTest.java |    8 +-
 .../accumulo/tserver/CompactionStats.java       |   59 -
 .../accumulo/tserver/CompactionWatcher.java     |  110 -
 .../org/apache/accumulo/tserver/Compactor.java  |  548 ---
 .../apache/accumulo/tserver/FileManager.java    |   12 +-
 .../apache/accumulo/tserver/InMemoryMap.java    |    2 +-
 .../accumulo/tserver/MinorCompactionReason.java |   21 +
 .../apache/accumulo/tserver/MinorCompactor.java |  146 -
 .../java/org/apache/accumulo/tserver/Rate.java  |   60 -
 .../org/apache/accumulo/tserver/RootFiles.java  |  133 -
 .../tserver/TConstraintViolationException.java  |   54 +
 .../org/apache/accumulo/tserver/Tablet.java     | 3856 ------------------
 .../tserver/TabletIteratorEnvironment.java      |    8 +-
 .../apache/accumulo/tserver/TabletServer.java   |   83 +-
 .../tserver/TabletServerResourceManager.java    |   67 +-
 .../accumulo/tserver/TabletStatsKeeper.java     |    6 +
 .../apache/accumulo/tserver/log/DfsLogger.java  |   60 +-
 .../accumulo/tserver/log/LocalWALRecovery.java  |   14 +-
 .../tserver/log/TabletServerLogger.java         |    4 +-
 .../apache/accumulo/tserver/tablet/Batch.java   |   51 +
 .../accumulo/tserver/tablet/CommitSession.java  |  121 +
 .../accumulo/tserver/tablet/CompactionInfo.java |  129 +
 .../tserver/tablet/CompactionRunner.java        |   76 +
 .../tserver/tablet/CompactionStats.java         |   59 +
 .../tserver/tablet/CompactionWatcher.java       |  110 +
 .../accumulo/tserver/tablet/Compactor.java      |  424 ++
 .../tserver/tablet/CountingIterator.java        |   78 +
 .../tserver/tablet/DatafileManager.java         |  605 +++
 .../apache/accumulo/tserver/tablet/KVEntry.java |   39 +
 .../tserver/tablet/MinorCompactionTask.java     |   99 +
 .../accumulo/tserver/tablet/MinorCompactor.java |  142 +
 .../apache/accumulo/tserver/tablet/Rate.java    |   60 +
 .../accumulo/tserver/tablet/RootFiles.java      |  133 +
 .../accumulo/tserver/tablet/ScanBatch.java      |   37 +
 .../accumulo/tserver/tablet/ScanDataSource.java |  222 +
 .../accumulo/tserver/tablet/ScanOptions.java    |   82 +
 .../apache/accumulo/tserver/tablet/Scanner.java |  136 +
 .../accumulo/tserver/tablet/SplitInfo.java      |   76 +
 .../accumulo/tserver/tablet/SplitRowSpec.java   |   29 +
 .../apache/accumulo/tserver/tablet/Tablet.java  | 2581 ++++++++++++
 .../tserver/tablet/TabletClosedException.java   |   29 +
 .../tserver/tablet/TabletCommitter.java         |   51 +
 .../accumulo/tserver/tablet/TabletMemory.java   |  190 +
 .../accumulo/tserver/CountingIteratorTest.java  |    2 +-
 .../apache/accumulo/tserver/RootFilesTest.java  |  149 -
 .../accumulo/tserver/tablet/RootFilesTest.java  |  150 +
 .../test/functional/MonitorLoggingIT.java       |    1 -
 test/system/continuous/master-agitator.pl       |    3 +-
 76 files changed, 6430 insertions(+), 5253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 8ad849b,1200fd1..59955f3
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -454,26 -441,6 +454,26 @@@ public enum Property 
    GENERAL_MAVEN_PROJECT_BASEDIR(AccumuloClassLoader.MAVEN_PROJECT_BASEDIR_PROPERTY_NAME, AccumuloClassLoader.DEFAULT_MAVEN_PROJECT_BASEDIR_VALUE,
        PropertyType.ABSOLUTEPATH, "Set this to automatically add maven target/classes directories to your dynamic classpath"),
  
 +  // General properties for configuring replication
 +  REPLICATION_PREFIX("replication.", null, PropertyType.PREFIX, "Properties in this category affect the replication of data to other Accumulo instances."),
 +  REPLICATION_PEERS("replication.peer.", null, PropertyType.PREFIX, "Properties in this category control what systems data can be replicated to"),
 +  REPLICATION_PEER_USER("replication.peer.user.", null, PropertyType.PREFIX, "The username to provide when authenticating with the given peer"),
 +  @Sensitive
 +  REPLICATION_PEER_PASSWORD("replication.peer.password.", null, PropertyType.PREFIX, "The password to provide when authenticating with the given peer"),
 +  REPLICATION_NAME("replication.name", "", PropertyType.STRING, "Name of this cluster with respect to replication. Used to identify this instance from other peers"),
 +  REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "1000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"),
 +  REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"),
 +  REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"),
 +  REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"),
 +  REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", PropertyType.COUNT, "Number of attempts to try to replicate some data before giving up and letting it naturally be retried later"),
-   REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, "Minimum number of threads for replciation"),
-   REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "5s", PropertyType.TIMEDURATION, "The time between adjustments of the replication thread pool."),
++  REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, "Minimum number of threads for replication"),
++  REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "30s", PropertyType.TIMEDURATION, "The time between adjustments of the replication thread pool."),
 +  REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.MEMORY, "Maximum size of data to send in a replication message"),
 +  REPLICATION_WORK_ASSIGNER("replication.work.assigner", "org.apache.accumulo.master.replication.SequentialWorkAssigner", PropertyType.CLASSNAME,
 +      "Replication WorkAssigner implementation to use"),
-   REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work"),
-   REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work"),
++  REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work, not useful outside of tests"),
++  REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work, not useful outside of tests"),
 +
    ;
  
    private String key, defaultValue, description;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/docs/src/main/asciidoc/chapters/replication.txt
----------------------------------------------------------------------
diff --cc docs/src/main/asciidoc/chapters/replication.txt
index 9f367df,0000000..dc87b62
mode 100644,000000..100644
--- a/docs/src/main/asciidoc/chapters/replication.txt
+++ b/docs/src/main/asciidoc/chapters/replication.txt
@@@ -1,184 -1,0 +1,205 @@@
 +// Licensed to the Apache Software Foundation (ASF) under one or more
 +// contributor license agreements.  See the NOTICE file distributed with
 +// this work for additional information regarding copyright ownership.
 +// The ASF licenses this file to You under the Apache License, Version 2.0
 +// (the "License"); you may not use this file except in compliance with
 +// the License.  You may obtain a copy of the License at
 +//
 +//     http://www.apache.org/licenses/LICENSE-2.0
 +//
 +// Unless required by applicable law or agreed to in writing, software
 +// distributed under the License is distributed on an "AS IS" BASIS,
 +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +// See the License for the specific language governing permissions and
 +// limitations under the License.
 +
 +== Replication
 +
 +=== Overview
 +
 +Replication is a feature of Accumulo which provides a mechanism to automatically
 +copy data to other systems, typically for the purpose of disaster recovery,
 +high availability, or geographic locality. It is best to consider this feature
 +as a framework for automatic replication instead of the ability to copy data
 +from to another Accumulo instance as copying to another Accumulo cluster is
 +only an implementation detail. The local Accumulo cluster is hereby referred
 +to as the +primary+ while systems being replicated to are known as
 ++peers+.
 +
 +This replication framework makes two Accumulo instances, where one instance
 +replicates to another, eventually consistent between one another, as opposed
 +to the strong consistency that each single Accumulo instance still holds. That
 +is to say, attempts to read data from a table on a peer which has pending replication
 +from the primary will not wait for that data to be replicated before running the scan.
 +This is desirable for a number of reasons, the most important is that the replication
 +framework is not limited by network outages or offline peers, but only by the HDFS
 +space available on the primary system.
 +
 +Replication configurations can be considered as a directed graph which allows cycles.
 +The systems in which data was replicated from is maintained in each Mutation which
 +allow each system to determine if a peer has already has the data in which
 +the system wants to send.
 +
 +Data is replicated by using the Write-Ahead logs (WAL) that each TabletServer is
 +already maintaining. TabletServers records which WALs have data that need to be
 +replicated to the +accumulo.metadata+ table. The Master uses these records,
 +combined with the local Accumulo table that the WAL was used with, to create records
 +in the +replication+ table which track which peers the given WAL should be
 +replicated to. The Master latter uses these work entries to assign the actual
 +replication task to a local TabletServer using ZooKeeper. A TabletServer will get
 +a lock in ZooKeeper for the replication of this file to a peer, and proceed to
 +replicate to the peer, recording progress in the +replication+ table as
 +data is successfully replicated on the peer. Later, the Master and Garbage Collector
 +will remove records from the +accumulo.metadata+ and +replication+ tables
 +and files from HDFS, respectively, after replication to all peers is complete.
 +
 +=== Configuration
 +
 +Configuration of Accumulo to replicate data to another system can be categorized
 +into the following sections.
 +
 +==== Site Configuration
 +
 +Each system involved in replication (even the primary) needs a name that uniquely
 +identifies it across all peers in the replication graph. This should be considered
 +fixed for an instance, and set in +accumulo-site.xml+.
 +
 +----
 +<property>
 +    <name>replication.name</name>
 +    <value>primary</value>
 +    <description>Unique name for this system used by replication</description>
 +</property>
 +----
 +
 +==== Instance Configuration
 +
 +For each peer of this system, Accumulo needs to know the name of that peer,
 +the class used to replicate data to that system and some configuration information
 +to connect to this remote peer. In the case of Accumulo, this additional data
 +is the Accumulo instance name and ZooKeeper quorum; however, this varies on the
 +replication implementation for the peer.
 +
 +These can be set in the site configuration to ease deployments; however, as they may
 +change, it can be useful to set this information using the Accumulo shell.
 +
 +To configure a peer with the name +peer1+ which is an Accumulo system with an instance name of +accumulo_peer+
 +and a ZooKeeper quorum of +10.0.0.1,10.0.2.1,10.0.3.1+, invoke the following
 +command in the shell.
 +
 +----
 +root@accumulo_primary> config -s
 +replication.peer.peer1=org.apache.accumulo.tserver.replication.AccumuloReplicaSystem,accumulo_peer,10.0.0.1,10.0.2.1,10.0.3.1
 +----
 +
 +Since this is an Accumulo system, we also want to set a username and password
 +to use when authenticating with this peer. On our peer, we make a special user
 +which has permission to write to the tables we want to replicate data into, "replication"
 +with a password of "password". We then need to record this in the primary's configuration.
 +
 +----
 +root@accumulo_primary> config -s replication.peer.user.peer1=replication
 +root@accumulo_primary> config -s replication.peer.password.peer1=password
 +----
 +
 +==== Table Configuration
 +
 +Now, we presently have a peer defined, so we just need to configure which tables will
 +replicate to that peer. We also need to configure an identifier to determine where
 +this data will be replicated on the peer. Since we're replicating to another Accumulo
 +cluster, this is a table ID. In this example, we want to enable replication on
 ++my_table+ and configure our peer +accumulo_peer+ as a target, sending
 +the data to the table with an ID of +2+ in +accumulo_peer+.
 +
 +\begingroup\fontsize{8pt}{8pt}\selectfont\begin{verbatim}
 +root@accumulo_primary> config -t my_table -s table.replication=true
 +root@accumulo_primary> config -t my_table -s table.replication.target.acccumulo_peer=2
 +\end{verbatim}\endgroup
 +
 +To replicate a single table on the primary to multiple peers, the second command
 +in the above shell snippet can be issued, for each peer and remote identifier pair.
 +
 +=== Monitoring
 +
 +Basic information about replication status from a primary can be found on the Accumulo
 +Monitor server, using the +Replication+ link the sidebar.
 +
 +On this page, information is broken down into the following sections:
 +
 +1. Files pending replication by peer and target
 +2. Files queued for replication, with progress made
 +
 +=== Work Assignment
 +
 +Depending on the schema of a table, different implementations of the WorkAssigner used could
 +be configured. The implementation is controlled via the property +replication.work.assigner+
 +and the full class name for the implementation. This can be configured via the shell or
 ++accumulo-site.xml+.
 +
 +----
 +<property>
 +    <name>replication.work.assigner</name>
 +    <value>org.apache.accumulo.master.replication.SequentialWorkAssigner</value>
 +    <description>Implementation used to assign work for replication</description>
 +</property>
 +----
 +
 +----
 +root@accumulo_primary> config -t my_table -s replication.work.assigner=org.apache.accumulo.master.replication.SequentialWorkAssigner
 +----
 +
 +Two implementations are provided. By default, the +SequentialWorkAssigner+ is configured for an
 +instance. The SequentialWorkAssigner ensures that, per peer and each remote identifier, each WAL is
 +replicated in the order in which they were created. This is sufficient to ensure that updates to a table
 +will be replayed in the correct order on the peer. This implementation has the downside of only replicating
 +a single WAL at a time.
 +
 +The second implementation, the +UnorderedWorkAssigner+ can be used to overcome the limitation
 +of only a single WAL being replicated to a target and peer at any time. Depending on the table schema,
 +it's possible that multiple versions of the same Key with different values are infrequent or nonexistent.
 +In this case, parallel replication to a peer and target is possible without any downsides. In the case
 +where this implementation is used were column updates are frequent, it is possible that there will be
 +an inconsistency between the primary and the peer.
 +
 +=== ReplicaSystems
 +
 ++ReplicaSystem+ is the interface which allows abstraction of replication of data
 +to peers of various types. Presently, only an +AccumuloReplicaSystem+ is provided
 +which will replicate data to another Accumulo instance. A +ReplicaSystem+ implementation
 +is run inside of the TabletServer process, and can be configured as mentioned in the 
 ++Instance Configuration+ section of this document. Theoretically, an implementation
 +of this interface could send data to other filesystems, databases, etc.
 +
 +==== AccumuloReplicaSystem
 +
 +The +AccumuloReplicaSystem+ uses Thrift to communicate with a peer Accumulo instance
 +and replicate the necessary data. The TabletServer running on the primary will communicate
 +with the Master on the peer to request the address of a TabletServer on the peer which
 +this TabletServer will use to replicate the data.
 +
 +The TabletServer on the primary will then replicate data in batches of a configurable
 +size (+replication.max.unit.size+). The TabletServer on the peer will report how many
 +records were applied back to the primary, which will be used to record how many records
 +were successfully replicated. The TabletServer on the primary will continue to replicate
 +data in these batches until no more data can be read from the file.
++
++=== Other Configuration
++
++There are a number of configuration values that can be used to control how
++the implementation of various components operate.
++
++[width="75%",cols=">,^2,^2"]
++[options="header"]
++|====
++|Property | Description | Default
++|replication.max.work.queue | Maximum number of files queued for replication at one time | 1000
++|replication.work.assignment.sleep | Time between invocations of the WorkAssigner | 30s
++|replication.worker.threads | Size of threadpool used to replicate data to peers | 4
++|replication.receipt.service.port | Thrift service port to listen for replication requests, can use '0' for a random port | 10002
++|replication.work.attempts | Number of attempts to replicate to a peer before aborting the attempt | 10
++|replication.receiver.min.threads | Minimum number of idle threads for handling incoming replication | 1
++|replication.receiver.threadcheck.time | Time between attempting adjustments of thread pool for incoming replications | 30s
++|replication.max.unit.size | Maximum amount of data to be replicated in one RPC | 64M
++|replication.work.assigner | Work Assigner implementation | org.apache.accumulo.master.replication.SequentialWorkAssigner
++|tserver.replication.batchwriter.replayer.memory| Size of BatchWriter cache to use in applying replication requests | 50M
++|====

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index e4c7ef9,2a453a8..689557c
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -223,8 -211,17 +213,19 @@@ import org.apache.accumulo.tserver.metr
  import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
  import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
  import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
 +import org.apache.accumulo.tserver.replication.ReplicationServicerHandler;
 +import org.apache.accumulo.tserver.replication.ReplicationWorker;
+ import org.apache.accumulo.tserver.tablet.CommitSession;
+ import org.apache.accumulo.tserver.tablet.CompactionInfo;
+ import org.apache.accumulo.tserver.tablet.CompactionWatcher;
+ import org.apache.accumulo.tserver.tablet.Compactor;
+ import org.apache.accumulo.tserver.tablet.KVEntry;
+ import org.apache.accumulo.tserver.tablet.Tablet.LookupResult;
+ import org.apache.accumulo.tserver.tablet.ScanBatch;
+ import org.apache.accumulo.tserver.tablet.Scanner;
+ import org.apache.accumulo.tserver.tablet.SplitInfo;
+ import org.apache.accumulo.tserver.tablet.Tablet;
+ import org.apache.accumulo.tserver.tablet.TabletClosedException;
  import org.apache.commons.collections.map.LRUMap;
  import org.apache.hadoop.fs.FSError;
  import org.apache.hadoop.fs.FileSystem;
@@@ -3121,30 -3099,7 +3122,30 @@@ public class TabletServer extends Abstr
      return address;
    }
  
 +  private HostAndPort startReplicationService() throws UnknownHostException {
 +    ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance()));
 +    ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
 +    AccumuloConfiguration conf = getSystemConfiguration();
 +    Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
 +    ServerAddress sp = TServerUtils.startServer(conf, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor,
 +        "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
 +    this.replServer = sp.server;
 +    log.info("Started replication service on " + sp.address);
 +
 +    try {
 +      // The replication service is unique to the thrift service for a tserver, not just a host.
 +      // Advertise the host and port for replication service given the host and port for the tserver.
 +      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZREPLICATION_TSERVERS + "/" + clientAddress.toString(),
 +          sp.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
 +    } catch (Exception e) {
 +      log.error("Could not advertise replication service port", e);
 +      throw new RuntimeException(e);
 +    }
 +
 +    return sp.address;
 +  }
 +
-   ZooLock getLock() {
+   public ZooLock getLock() {
      return tabletServerLock;
    }
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index b7b0aff,9fec437..b4f14ec
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -41,9 -36,6 +41,8 @@@ import org.apache.accumulo.core.replica
  import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.accumulo.server.conf.TableConfiguration;
  import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.util.ReplicationTableUtil;
- import org.apache.accumulo.tserver.Tablet.CommitSession;
  import org.apache.accumulo.tserver.TabletMutations;
  import org.apache.accumulo.tserver.TabletServer;
  import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 0000000,2771db9..5b46b7b
mode 000000,100644..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@@ -1,0 -1,581 +1,605 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.tserver.tablet;
+ 
+ import java.io.IOException;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
++import java.util.Map.Entry;
+ import java.util.Set;
+ import java.util.SortedMap;
+ import java.util.TreeMap;
+ import java.util.TreeSet;
 -import java.util.Map.Entry;
+ 
+ import org.apache.accumulo.core.client.Connector;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.KeyExtent;
+ import org.apache.accumulo.core.metadata.schema.DataFileValue;
++import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
++import org.apache.accumulo.core.replication.StatusUtil;
+ import org.apache.accumulo.core.security.Credentials;
+ import org.apache.accumulo.core.util.MapCounter;
+ import org.apache.accumulo.core.util.Pair;
+ import org.apache.accumulo.core.util.UtilWaitThread;
+ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+ import org.apache.accumulo.server.ServerConstants;
+ import org.apache.accumulo.server.client.HdfsZooInstance;
+ import org.apache.accumulo.server.fs.FileRef;
+ import org.apache.accumulo.server.fs.VolumeManager;
+ import org.apache.accumulo.server.master.state.TServerInstance;
+ import org.apache.accumulo.server.security.SystemCredentials;
+ import org.apache.accumulo.server.util.MasterMetadataUtil;
+ import org.apache.accumulo.server.util.MetadataTableUtil;
++import org.apache.accumulo.server.util.ReplicationTableUtil;
+ import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+ import org.apache.accumulo.trace.instrument.Span;
+ import org.apache.accumulo.trace.instrument.Trace;
+ import org.apache.accumulo.tserver.TLevel;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.log4j.Logger;
+ 
+ class DatafileManager {
+   private final Logger log = Logger.getLogger(DatafileManager.class);
+   // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
+   private final Map<FileRef,DataFileValue> datafileSizes = Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
+   private final Tablet tablet;
+   
+   // ensure we only have one reader/writer of our bulk file notes at at time
+   private final Object bulkFileImportLock = new Object();
+ 
+   DatafileManager(Tablet tablet, SortedMap<FileRef,DataFileValue> datafileSizes) {
+     for (Entry<FileRef,DataFileValue> datafiles : datafileSizes.entrySet()) {
+       this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
+     }
+     this.tablet = tablet;
+   }
+ 
+   private FileRef mergingMinorCompactionFile = null;
+   private final Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
+   private final Map<Long,Set<FileRef>> scanFileReservations = new HashMap<Long,Set<FileRef>>();
+   private final MapCounter<FileRef> fileScanReferenceCounts = new MapCounter<FileRef>();
+   private long nextScanReservationId = 0;
+   private boolean reservationsBlocked = false;
+ 
+   private final Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
+   
+   static void rename(VolumeManager fs, Path src, Path dst) throws IOException {
+     if (!fs.rename(src, dst)) {
+       throw new IOException("Rename " + src + " to " + dst + " returned false ");
+     }
+   }
+ 
+   Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
+     synchronized (tablet) {
+ 
+       while (reservationsBlocked) {
+         try {
+           tablet.wait(50);
+         } catch (InterruptedException e) {
+           log.warn(e, e);
+         }
+       }
+ 
+       Set<FileRef> absFilePaths = new HashSet<FileRef>(datafileSizes.keySet());
+ 
+       long rid = nextScanReservationId++;
+ 
+       scanFileReservations.put(rid, absFilePaths);
+ 
+       Map<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>();
+ 
+       for (FileRef path : absFilePaths) {
+         fileScanReferenceCounts.increment(path, 1);
+         ret.put(path, datafileSizes.get(path));
+       }
+ 
+       return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
+     }
+   }
+ 
+   void returnFilesForScan(Long reservationId) {
+ 
+     final Set<FileRef> filesToDelete = new HashSet<FileRef>();
+ 
+     synchronized (tablet) {
+       Set<FileRef> absFilePaths = scanFileReservations.remove(reservationId);
+ 
+       if (absFilePaths == null)
+         throw new IllegalArgumentException("Unknown scan reservation id " + reservationId);
+ 
+       boolean notify = false;
+       for (FileRef path : absFilePaths) {
+         long refCount = fileScanReferenceCounts.decrement(path, 1);
+         if (refCount == 0) {
+           if (filesToDeleteAfterScan.remove(path))
+             filesToDelete.add(path);
+           notify = true;
+         } else if (refCount < 0)
+           throw new IllegalStateException("Scan ref count for " + path + " is " + refCount);
+       }
+ 
+       if (notify)
+         tablet.notifyAll();
+     }
+ 
+     if (filesToDelete.size() > 0) {
+       log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete);
+       MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(), tablet.getTabletServer().getLock());
+     }
+   }
+ 
+   void removeFilesAfterScan(Set<FileRef> scanFiles) {
+     if (scanFiles.size() == 0)
+       return;
+ 
+     Set<FileRef> filesToDelete = new HashSet<FileRef>();
+ 
+     synchronized (tablet) {
+       for (FileRef path : scanFiles) {
+         if (fileScanReferenceCounts.get(path) == 0)
+           filesToDelete.add(path);
+         else
+           filesToDeleteAfterScan.add(path);
+       }
+     }
+ 
+     if (filesToDelete.size() > 0) {
+       log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete);
+       MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(), tablet.getTabletServer().getLock());
+     }
+   }
+ 
+   private TreeSet<FileRef> waitForScansToFinish(Set<FileRef> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
+     long startTime = System.currentTimeMillis();
+     TreeSet<FileRef> inUse = new TreeSet<FileRef>();
+ 
+     Span waitForScans = Trace.start("waitForScans");
+     try {
+       synchronized (tablet) {
+         if (blockNewScans) {
+           if (reservationsBlocked)
+             throw new IllegalStateException();
+ 
+           reservationsBlocked = true;
+         }
+ 
+         for (FileRef path : pathsToWaitFor) {
+           while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) {
+             try {
+               tablet.wait(100);
+             } catch (InterruptedException e) {
+               log.warn(e, e);
+             }
+           }
+         }
+ 
+         for (FileRef path : pathsToWaitFor) {
+           if (fileScanReferenceCounts.get(path) > 0)
+             inUse.add(path);
+         }
+ 
+         if (blockNewScans) {
+           reservationsBlocked = false;
+           tablet.notifyAll();
+         }
+ 
+       }
+     } finally {
+       waitForScans.stop();
+     }
+     return inUse;
+   }
+ 
+   public void importMapFiles(long tid, Map<FileRef,DataFileValue> pathsString, boolean setTime) throws IOException {
+ 
+     final KeyExtent extent = tablet.getExtent();
+     String bulkDir = null;
+ 
+     Map<FileRef,DataFileValue> paths = new HashMap<FileRef,DataFileValue>();
+     for (Entry<FileRef,DataFileValue> entry : pathsString.entrySet())
+       paths.put(entry.getKey(), entry.getValue());
+ 
+     for (FileRef tpath : paths.keySet()) {
+ 
+       boolean inTheRightDirectory = false;
+       Path parent = tpath.path().getParent().getParent();
+       for (String tablesDir : ServerConstants.getTablesDirs()) {
+         if (parent.equals(new Path(tablesDir, tablet.getExtent().getTableId().toString()))) {
+           inTheRightDirectory = true;
+           break;
+         }
+       }
+       if (!inTheRightDirectory) {
+         throw new IOException("Data file " + tpath + " not in table dirs");
+       }
+ 
+       if (bulkDir == null)
+         bulkDir = tpath.path().getParent().toString();
+       else if (!bulkDir.equals(tpath.path().getParent().toString()))
+         throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath);
+ 
+     }
+ 
+     if (tablet.getExtent().isRootTablet()) {
+       throw new IllegalArgumentException("Can not import files to root tablet");
+     }
+ 
+     synchronized (bulkFileImportLock) {
+       Credentials creds = SystemCredentials.get();
+       Connector conn;
+       try {
+         conn = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), creds.getToken());
+       } catch (Exception ex) {
+         throw new IOException(ex);
+       }
+       // Remove any bulk files we've previously loaded and compacted away
+       List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn, extent, tid);
+ 
+       for (FileRef file : files)
+         if (paths.keySet().remove(file))
+           log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file);
+ 
+       if (paths.size() > 0) {
+         long bulkTime = Long.MIN_VALUE;
+         if (setTime) {
+           for (DataFileValue dfv : paths.values()) {
+             long nextTime = tablet.getAndUpdateTime();
+             if (nextTime < bulkTime)
+               throw new IllegalStateException("Time went backwards unexpectedly " + nextTime + " " + bulkTime);
+             bulkTime = nextTime;
+             dfv.setTime(bulkTime);
+           }
+         }
+         
+         tablet.updatePersistedTime(bulkTime, paths, tid);
+       }
+     }
+ 
+     synchronized (tablet) {
+       for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
+         if (datafileSizes.containsKey(tpath.getKey())) {
+           log.error("Adding file that is already in set " + tpath.getKey());
+         }
+         datafileSizes.put(tpath.getKey(), tpath.getValue());
+ 
+       }
+ 
+       tablet.getTabletResources().importedMapFiles();
+ 
+       tablet.computeNumEntries();
+     }
+ 
+     for (Entry<FileRef,DataFileValue> entry : paths.entrySet()) {
+       log.log(TLevel.TABLET_HIST, tablet.getExtent() + " import " + entry.getKey() + " " + entry.getValue());
+     }
+   }
+ 
+   FileRef reserveMergingMinorCompactionFile() {
+     if (mergingMinorCompactionFile != null)
+       throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved  : " + mergingMinorCompactionFile);
+ 
+     if (tablet.getExtent().isRootTablet())
+       return null;
+ 
+     int maxFiles = tablet.getTableConfiguration().getMaxFilesPerTablet();
+ 
+     // when a major compaction is running and we are at max files, write out
+     // one extra file... want to avoid the case where major compaction is
+     // compacting everything except for the largest file, and therefore the
+     // largest file is returned for merging.. the following check mostly
+     // avoids this case, except for the case where major compactions fail or
+     // are canceled
+     if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles)
+       return null;
+ 
+     if (datafileSizes.size() >= maxFiles) {
+       // find the smallest file
+ 
+       long min = Long.MAX_VALUE;
+       FileRef minName = null;
+ 
+       for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+         if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) {
+           min = entry.getValue().getSize();
+           minName = entry.getKey();
+         }
+       }
+ 
+       if (minName == null)
+         return null;
+ 
+       mergingMinorCompactionFile = minName;
+       return minName;
+     }
+ 
+     return null;
+   }
+ 
+   void unreserveMergingMinorCompactionFile(FileRef file) {
+     if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null)
+         || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile)))
+       throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile);
+ 
+     mergingMinorCompactionFile = null;
+   }
+ 
+   void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId)
+       throws IOException {
+ 
+     IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+     if (tablet.getExtent().isRootTablet()) {
+       try {
+         if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) {
+           throw new IllegalStateException();
+         }
+       } catch (Exception e) {
+         throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+       }
+     }
+ 
+     // rename before putting in metadata table, so files in metadata table should
+     // always exist
+     do {
+       try {
+         if (dfv.getNumEntries() == 0) {
+           tablet.getTabletServer().getFileSystem().deleteRecursively(tmpDatafile.path());
+         } else {
+           if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) {
+             log.warn("Target map file already exist " + newDatafile);
+             tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path());
+           }
+ 
+           rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path());
+         }
+         break;
+       } catch (IOException ioe) {
+         log.warn("Tablet " + tablet.getExtent() + " failed to rename " + newDatafile + " after MinC, will retry in 60 secs...", ioe);
+         UtilWaitThread.sleep(60 * 1000);
+       }
+     } while (true);
+ 
+     long t1, t2;
+ 
+     // the code below always assumes merged files are in use by scans... this must be done
+     // because the in memory list of files is not updated until after the metadata table
+     // therefore the file is available to scans until memory is updated, but want to ensure
+     // the file is not available for garbage collection... if memory were updated
+     // before this point (like major compactions do), then the following code could wait
+     // for scans to finish like major compactions do.... used to wait for scans to finish
+     // here, but that was incorrect because a scan could start after waiting but before
+     // memory was updated... assuming the file is always in use by scans leads to
+     // one uneeded metadata update when it was not actually in use
+     Set<FileRef> filesInUseByScans = Collections.emptySet();
+     if (absMergeFile != null)
+       filesInUseByScans = Collections.singleton(absMergeFile);
+ 
+     // very important to write delete entries outside of log lock, because
+     // this metadata write does not go up... it goes sideways or to itself
+     if (absMergeFile != null)
+       MetadataTableUtil.addDeleteEntries(tablet.getExtent(), Collections.singleton(absMergeFile), SystemCredentials.get());
+ 
+     Set<String> unusedWalLogs = tablet.beginClearingUnusedLogs();
++    boolean replicate = ReplicationConfigurationUtil.isEnabled(tablet.getExtent(), tablet.getTableConfiguration());
++    Set<String> logFileOnly = null;
++    if (replicate) {
++      // unusedWalLogs is of the form host/fileURI, need to strip off the host portion
++      logFileOnly = new HashSet<>();
++      for (String unusedWalLog : unusedWalLogs) {
++        int index = unusedWalLog.indexOf('/');
++        if (-1 == index) {
++          log.warn("Could not find host component to strip from DFSLogger representation of WAL");
++        } else {
++          unusedWalLog = unusedWalLog.substring(index + 1);
++        }
++        logFileOnly.add(unusedWalLog);
++      }
++    }
+     try {
+       // the order of writing to metadata and walog is important in the face of machine/process failures
+       // need to write to metadata before writing to walog, when things are done in the reverse order
+       // data could be lost... the minor compaction start even should be written before the following metadata
+       // write is made
+ 
+       tablet.updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, absMergeFile, dfv, unusedWalLogs, filesInUseByScans, flushId);
+ 
++      // Mark that we have data we want to replicate
++      // This WAL could still be in use by other Tablets *from the same table*, so we can only mark that there is data to replicate,
++      // but it is *not* closed
++      if (replicate) {
++        ReplicationTableUtil.updateFiles(SystemCredentials.get(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength());
++      }
+     } finally {
+       tablet.finishClearingUnusedLogs();
+     }
+ 
+     do {
+       try {
+         // the purpose of making this update use the new commit session, instead of the old one passed in,
+         // is because the new one will reference the logs used by current memory...
+         
+         tablet.getTabletServer().minorCompactionFinished(tablet.getTabletMemory().getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2);
+         break;
+       } catch (IOException e) {
+         log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e);
+         UtilWaitThread.sleep(1 * 1000);
+       }
+     } while (true);
+ 
+     synchronized (tablet) {
+       t1 = System.currentTimeMillis();
+ 
+       if (datafileSizes.containsKey(newDatafile)) {
+         log.error("Adding file that is already in set " + newDatafile);
+       }
+       
+       if (dfv.getNumEntries() > 0) {
+         datafileSizes.put(newDatafile, dfv);
+       }
+       
+       if (absMergeFile != null) {
+         datafileSizes.remove(absMergeFile);
+       }
+       
+       unreserveMergingMinorCompactionFile(absMergeFile);
+       
+       tablet.flushComplete(flushId);
+       
+       t2 = System.currentTimeMillis();
+     }
+ 
+     // must do this after list of files in memory is updated above
+     removeFilesAfterScan(filesInUseByScans);
+ 
+     if (absMergeFile != null)
+       log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [" + absMergeFile + ",memory] -> " + newDatafile);
+     else
+       log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [memory] -> " + newDatafile);
+     log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, tablet.getExtent().toString()));
+     long splitSize = tablet.getTableConfiguration().getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD);
+     if (dfv.getSize() > splitSize) {
+       log.debug(String.format("Minor Compaction wrote out file larger than split threshold.  split threshold = %,d  file size = %,d", splitSize, dfv.getSize()));
+     }
+   }
+ 
+   public void reserveMajorCompactingFiles(Collection<FileRef> files) {
+     if (majorCompactingFiles.size() != 0)
+       throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles);
+ 
+     if (mergingMinorCompactionFile != null && files.contains(mergingMinorCompactionFile))
+       throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile);
+ 
+     majorCompactingFiles.addAll(files);
+   }
+ 
+   public void clearMajorCompactingFile() {
+     majorCompactingFiles.clear();
+   }
+ 
+   void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
+       throws IOException {
+     final KeyExtent extent = tablet.getExtent();
+     long t1, t2;
+ 
+     if (!extent.isRootTablet()) {
+ 
+       if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) {
+         log.error("Target map file already exist " + newDatafile, new Exception());
+         throw new IllegalStateException("Target map file already exist " + newDatafile);
+       }
+ 
+       // rename before putting in metadata table, so files in metadata table should
+       // always exist
+       rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path());
+ 
+       if (dfv.getNumEntries() == 0) {
+         tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path());
+       }
+     }
+ 
+     TServerInstance lastLocation = null;
+     synchronized (tablet) {
+ 
+       t1 = System.currentTimeMillis();
+ 
+       IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ 
+       tablet.incrementDataSourceDeletions();
+ 
+       if (extent.isRootTablet()) {
+ 
+         waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
+ 
+         try {
+           if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) {
+             throw new IllegalStateException();
+           }
+         } catch (Exception e) {
+           throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+         }
+ 
+         // mark files as ready for deletion, but
+         // do not delete them until we successfully
+         // rename the compacted map file, in case
+         // the system goes down
+ 
+         RootFiles.replaceFiles(tablet.getTableConfiguration(), tablet.getTabletServer().getFileSystem(), tablet.getLocation(), oldDatafiles, tmpDatafile, newDatafile);
+       }
+ 
+       // atomically remove old files and add new file
+       for (FileRef oldDatafile : oldDatafiles) {
+         if (!datafileSizes.containsKey(oldDatafile)) {
+           log.error("file does not exist in set " + oldDatafile);
+         }
+         datafileSizes.remove(oldDatafile);
+         majorCompactingFiles.remove(oldDatafile);
+       }
+ 
+       if (datafileSizes.containsKey(newDatafile)) {
+         log.error("Adding file that is already in set " + newDatafile);
+       }
+ 
+       if (dfv.getNumEntries() > 0) {
+         datafileSizes.put(newDatafile, dfv);
+       }
+ 
+       // could be used by a follow on compaction in a multipass compaction
+       majorCompactingFiles.add(newDatafile);
+ 
+       tablet.computeNumEntries();
+ 
+       lastLocation = tablet.resetLastLocation();
+ 
+       tablet.setLastCompactionID(compactionId);
+       t2 = System.currentTimeMillis();
+     }
+ 
+     if (!extent.isRootTablet()) {
+       Set<FileRef> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000);
+       if (filesInUseByScans.size() > 0)
+         log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans);
+       MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv, SystemCredentials.get(),
+           tablet.getTabletServer().getClientAddressString(), lastLocation, tablet.getTabletServer().getLock());
+       removeFilesAfterScan(filesInUseByScans);
+     }
+ 
+     log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0));
+     log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " --> " + newDatafile);
+   }
+ 
+   public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
+     synchronized (tablet) {
+       TreeMap<FileRef,DataFileValue> copy = new TreeMap<FileRef,DataFileValue>(datafileSizes);
+       return Collections.unmodifiableSortedMap(copy);
+     }
+   }
+ 
+   public Set<FileRef> getFiles() {
+     synchronized (tablet) {
+       HashSet<FileRef> files = new HashSet<FileRef>(datafileSizes.keySet());
+       return Collections.unmodifiableSet(files);
+     }
+   }
+   
+   public int getNumFiles() {
+     return datafileSizes.size();
+   }
+ 
+ }


Mime
View raw message