hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [07/26] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)
Date Tue, 15 Dec 2015 17:43:21 GMT
HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26ac60b0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26ac60b0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26ac60b0

Branch: refs/heads/hbase-12439
Commit: 26ac60b03f80c9215103a02db783341e67037753
Parents: 9647fee
Author: ramkrishna <ramkrishna.s.vasudevan@gmail.com>
Authored: Thu Dec 10 13:07:46 2015 +0530
Committer: ramkrishna <ramkrishna.s.vasudevan@gmail.com>
Committed: Thu Dec 10 13:07:46 2015 +0530

----------------------------------------------------------------------
 .../hbase/replication/ReplicationPeers.java     |   2 +-
 .../replication/ReplicationPeersZKImpl.java     |  26 +-
 .../hbase/replication/ReplicationQueues.java    |  25 +-
 .../replication/ReplicationQueuesClient.java    |  25 +-
 .../ReplicationQueuesClientZKImpl.java          |  37 ++
 .../replication/ReplicationQueuesZKImpl.java    |  70 +++
 .../replication/ReplicationStateZKBase.java     |  14 +-
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   |  24 +-
 .../org/apache/hadoop/hbase/HConstants.java     |  16 +-
 .../MetricsReplicationSinkSource.java           |   2 +
 .../MetricsReplicationSourceSource.java         |   6 +
 .../MetricsReplicationGlobalSourceSource.java   |  21 +
 .../MetricsReplicationSinkSourceImpl.java       |   7 +
 .../MetricsReplicationSourceSourceImpl.java     |  28 +
 .../hbase/protobuf/generated/AdminProtos.java   | 602 +++++++++++++++++--
 hbase-protocol/src/main/protobuf/Admin.proto    |   3 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 152 +++--
 .../hbase/protobuf/ReplicationProtbufUtil.java  |  46 +-
 .../hbase/regionserver/RSRpcServices.java       |   4 +-
 .../regionserver/ReplicationSinkService.java    |   8 +-
 .../regionserver/wal/WALActionsListener.java    |  19 +-
 .../hbase/replication/ScopeWALEntryFilter.java  |  72 ++-
 .../replication/TableCfWALEntryFilter.java      |  76 ++-
 .../master/ReplicationHFileCleaner.java         | 193 ++++++
 .../DefaultSourceFSConfigurationProvider.java   |  78 +++
 .../HBaseInterClusterReplicationEndpoint.java   |  32 +-
 .../regionserver/HFileReplicator.java           | 393 ++++++++++++
 .../replication/regionserver/MetricsSink.java   |  13 +-
 .../replication/regionserver/MetricsSource.java |  31 +
 .../RegionReplicaReplicationEndpoint.java       |   4 +-
 .../replication/regionserver/Replication.java   | 133 +++-
 .../regionserver/ReplicationSink.java           | 200 +++++-
 .../regionserver/ReplicationSource.java         |  92 ++-
 .../ReplicationSourceInterface.java             |  13 +
 .../regionserver/ReplicationSourceManager.java  |  21 +
 .../SourceFSConfigurationProvider.java          |  40 ++
 .../security/access/SecureBulkLoadEndpoint.java |  18 +-
 .../cleaner/TestReplicationHFileCleaner.java    | 264 ++++++++
 .../replication/ReplicationSourceDummy.java     |   8 +
 .../replication/TestMasterReplication.java      | 313 +++++++++-
 .../replication/TestReplicationSmallTests.java  |   3 +-
 .../replication/TestReplicationStateBasic.java  |  57 ++
 .../replication/TestReplicationStateZKImpl.java |   1 +
 .../replication/TestReplicationSyncUpTool.java  |  10 +-
 ...ReplicationSyncUpToolWithBulkLoadedData.java | 235 ++++++++
 .../regionserver/TestReplicationSink.java       | 179 +++++-
 .../TestReplicationSourceManager.java           |  70 ++-
 .../TestSourceFSConfigurationProvider.java      |  25 +
 48 files changed, 3444 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 8e80e06..8bf21d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -50,7 +50,7 @@ public interface ReplicationPeers {
    * @param peerId a short that identifies the cluster
    * @param peerConfig configuration for the replication slave cluster
    * @param tableCFs the table and column-family list which will be replicated for this peer or null
-   * for all table and column families
+   *          for all table and column families
    */
   void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
       throws ReplicationException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 63f9ac3..fd10b66 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 
 import com.google.protobuf.ByteString;
 
@@ -120,8 +121,21 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       }
 
       checkQueuesDeleted(id);
-      
+
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+
+      // If only bulk load hfile replication is enabled then add peerId node to hfile-refs node
+      if (replicationForBulkLoadEnabled) {
+        try {
+          String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
+          LOG.info("Adding peer " + peerId + " to hfile reference queue.");
+          ZKUtil.createWithParents(this.zookeeper, peerId);
+        } catch (KeeperException e) {
+          throw new ReplicationException("Failed to add peer with id=" + id
+              + ", node under hfile references node.", e);
+        }
+      }
+
       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
       ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
         toByteArray(peerConfig));
@@ -151,6 +165,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
             + " because that id does not exist.");
       }
       ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+      // Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile
+      // replication is enabled or not
+
+      String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
+      try {
+        LOG.info("Removing peer " + peerId + " from hfile reference queue.");
+        ZKUtil.deleteNodeRecursively(this.zookeeper, peerId);
+      } catch (NoNodeException e) {
+        LOG.info("Did not find node " + peerId + " to delete.", e);
+      }
     } catch (KeeperException e) {
       throw new ReplicationException("Could not remove peer with id=" + id, e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 3dbbc33..0d47a88 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -26,7 +26,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
  * This provides an interface for maintaining a region server's replication queues. These queues
- * keep track of the WALs that still need to be replicated to remote clusters.
+ * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
+ * that still need to be replicated to remote clusters.
  */
 @InterfaceAudience.Private
 public interface ReplicationQueues {
@@ -113,4 +114,26 @@ public interface ReplicationQueues {
    * @return if this is this rs's znode
    */
   boolean isThisOurZnode(String znode);
+
+  /**
+   * Add a peer to hfile reference queue if peer does not exist.
+   * @param peerId peer cluster id to be added
+   * @throws ReplicationException if fails to add a peer id to hfile reference queue
+   */
+  void addPeerToHFileRefs(String peerId) throws ReplicationException;
+
+  /**
+   * Add new hfile references to the queue.
+   * @param peerId peer cluster id to which the hfiles need to be replicated
+   * @param files list of hfile references to be added
+   * @throws ReplicationException if fails to add a hfile reference
+   */
+  void addHFileRefs(String peerId, List<String> files) throws ReplicationException;
+
+  /**
+   * Remove hfile references from the queue.
+   * @param peerId peer cluster id from which this hfile references needs to be removed
+   * @param files list of hfile references to be removed
+   */
+  void removeHFileRefs(String peerId, List<String> files);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index 5b3e541..7fa3bbb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -25,7 +25,8 @@ import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for clients of replication to view replication queues. These queues
- * keep track of the WALs that still need to be replicated to remote clusters.
+ * keep track of the sources(WALs/HFile references) that still need to be replicated to remote
+ * clusters.
  */
 @InterfaceAudience.Private
 public interface ReplicationQueuesClient {
@@ -65,4 +66,26 @@ public interface ReplicationQueuesClient {
    * @return cversion of replication rs node
    */
   int getQueuesZNodeCversion() throws KeeperException;
+
+  /**
+   * Get the change version number of replication hfile references node. This can be used as
+   * optimistic locking to get a consistent snapshot of the replication queues of hfile references.
+   * @return change version number of hfile references node
+   */
+  int getHFileRefsNodeChangeVersion() throws KeeperException;
+
+  /**
+   * Get list of all peers from hfile reference queue.
+   * @return a list of peer ids
+   * @throws KeeperException zookeeper exception
+   */
+  List<String> getAllPeersFromHFileRefsQueue() throws KeeperException;
+
+  /**
+   * Get a list of all hfile references in the given peer.
+   * @param peerId a String that identifies the peer
+   * @return a list of hfile references, null if not found any
+   * @throws KeeperException zookeeper exception
+   */
+  List<String> getReplicableHFiles(String peerId) throws KeeperException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index e1a6a49..cc407e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -84,4 +84,41 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
       throw e;
     }
   }
+
+  @Override
+  public int getHFileRefsNodeChangeVersion() throws KeeperException {
+    Stat stat = new Stat();
+    try {
+      ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get stat of replication hfile references node.", e);
+      throw e;
+    }
+    return stat.getCversion();
+  }
+
+  @Override
+  public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of all peers in hfile references node.", e);
+      throw e;
+    }
+    return result;
+  }
+
+  @Override
+  public List<String> getReplicableHFiles(String peerId) throws KeeperException {
+    String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e);
+      throw e;
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 97763e2..43dd412 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -84,6 +84,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
     } catch (KeeperException e) {
       throw new ReplicationException("Could not initialize replication queues.", e);
     }
+    // If only bulk load hfile replication is enabled then create the hfile-refs znode
+    if (replicationForBulkLoadEnabled) {
+      try {
+        ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+      } catch (KeeperException e) {
+        throw new ReplicationException("Could not initialize hfile references replication queue.",
+            e);
+      }
+    }
   }
 
   @Override
@@ -431,4 +440,65 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
         ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
     return ProtobufUtil.prependPBMagic(bytes);
   }
+
+  @Override
+  public void addHFileRefs(String peerId, List<String> files) throws ReplicationException {
+    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    boolean debugEnabled = LOG.isDebugEnabled();
+    if (debugEnabled) {
+      LOG.debug("Adding hfile references " + files + " in queue " + peerZnode);
+    }
+    List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+    int size = files.size();
+    for (int i = 0; i < size; i++) {
+      listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)),
+        HConstants.EMPTY_BYTE_ARRAY));
+    }
+    if (debugEnabled) {
+      LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode
+          + " is " + listOfOps.size());
+    }
+    try {
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e);
+    }
+  }
+
+  @Override
+  public void removeHFileRefs(String peerId, List<String> files) {
+    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    boolean debugEnabled = LOG.isDebugEnabled();
+    if (debugEnabled) {
+      LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
+    }
+    List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+    int size = files.size();
+    for (int i = 0; i < size; i++) {
+      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i))));
+    }
+    if (debugEnabled) {
+      LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode
+          + " is " + listOfOps.size());
+    }
+    try {
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
+    } catch (KeeperException e) {
+      LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
+    }
+  }
+
+  @Override
+  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
+    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    try {
+      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
+        LOG.info("Adding peer " + peerId + " to hfile reference queue.");
+        ZKUtil.createWithParents(this.zookeeper, peerZnode);
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
+          e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 4fbac0f..762167f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@@ -48,32 +49,43 @@ public abstract class ReplicationStateZKBase {
   protected final String peersZNode;
   /** The name of the znode that contains all replication queues */
   protected final String queuesZNode;
+  /** The name of the znode that contains queues of hfile references to be replicated */
+  protected final String hfileRefsZNode;
   /** The cluster key of the local cluster */
   protected final String ourClusterKey;
   protected final ZooKeeperWatcher zookeeper;
   protected final Configuration conf;
   protected final Abortable abortable;
+  protected final boolean replicationForBulkLoadEnabled;
 
   // Public for testing
   public static final byte[] ENABLED_ZNODE_BYTES =
       toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
   public static final byte[] DISABLED_ZNODE_BYTES =
       toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
+      "zookeeper.znode.replication.hfile.refs";
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
 
   public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
       Abortable abortable) {
     this.zookeeper = zookeeper;
     this.conf = conf;
     this.abortable = abortable;
+    this.replicationForBulkLoadEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
 
     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
     String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
+    String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
     this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
     this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
     this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
     this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
+    this.hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
   }
 
   public List<String> getListOfReplicators() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index c268268..9e01d09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -884,7 +885,7 @@ public class ZKUtil {
               JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME) == null
           && conf.get(HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL) == null
           && conf.get(HConstants.ZK_SERVER_KERBEROS_PRINCIPAL) == null) {
-              
+
         return false;
       }
     } catch(Exception e) {
@@ -1797,6 +1798,27 @@ public class ZKUtil {
       } else if (child.equals(zkw.getConfiguration().
           get("zookeeper.znode.replication.rs", "rs"))) {
         appendRSZnodes(zkw, znode, sb);
+      } else if (child.equals(zkw.getConfiguration().get(
+        ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+        ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT))) {
+        appendHFileRefsZnodes(zkw, znode, sb);
+      }
+    }
+  }
+
+  private static void appendHFileRefsZnodes(ZooKeeperWatcher zkw, String hfileRefsZnode,
+      StringBuilder sb) throws KeeperException {
+    sb.append("\n").append(hfileRefsZnode).append(": ");
+    for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, hfileRefsZnode)) {
+      String znodeToProcess = ZKUtil.joinZNode(hfileRefsZnode, peerIdZnode);
+      sb.append("\n").append(znodeToProcess).append(": ");
+      List<String> peerHFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, znodeToProcess);
+      int size = peerHFileRefsZnodes.size();
+      for (int i = 0; i < size; i++) {
+        sb.append(peerHFileRefsZnodes.get(i));
+        if (i != size - 1) {
+          sb.append(", ");
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index ac57514..6fafad3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -850,6 +850,18 @@ public final class HConstants {
       REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
   public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
     "org.apache.hadoop.hbase.replication.regionserver.Replication";
+  public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled";
+  public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
+  /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
+  public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
+  /**
+   * Directory where the source cluster file system client configuration are placed which is used by
+   * sink cluster to copy HFiles from source cluster file system
+   */
+  public static final String REPLICATION_CONF_DIR = "hbase.replication.conf.dir";
+
+  /** Maximum time to retry for a failed bulk load request */
+  public static final String BULKLOAD_MAX_RETRIES_NUMBER = "hbase.bulkload.retries.number";
 
   /** HBCK special code name used as server name when manipulating ZK nodes */
   public static final String HBCK_CODE_NAME = "HBCKServerName";
@@ -1241,7 +1253,7 @@ public final class HConstants {
 
   public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY =
       "hbase.canary.write.table.check.period";
-  
+
   /**
    * Configuration keys for programmatic JAAS configuration for secured ZK interaction
    */
@@ -1250,7 +1262,7 @@ public final class HConstants {
       "hbase.zookeeper.client.kerberos.principal";
   public static final String ZK_SERVER_KEYTAB_FILE = "hbase.zookeeper.server.keytab.file";
   public static final String ZK_SERVER_KERBEROS_PRINCIPAL =
-      "hbase.zookeeper.server.kerberos.principal";  
+      "hbase.zookeeper.server.kerberos.principal";
 
   private HConstants() {
     // Can't be instantiated with this ctor.

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
index 698a59a..9fb8415 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
@@ -22,9 +22,11 @@ public interface MetricsReplicationSinkSource {
   public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
   public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
   public static final String SINK_APPLIED_OPS = "sink.appliedOps";
+  public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";
 
   void setLastAppliedOpAge(long age);
   void incrAppliedBatches(long batches);
   void incrAppliedOps(long batchsize);
   long getLastAppliedOpAge();
+  void incrAppliedHFiles(long hfileSize);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index fecf191..188c3a3 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -32,6 +32,9 @@ public interface MetricsReplicationSourceSource {
 
   public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
 
+  public static final String SOURCE_SHIPPED_HFILES = "source.shippedHFiles";
+  public static final String SOURCE_SIZE_OF_HFILE_REFS_QUEUE = "source.sizeOfHFileRefsQueue";
+
   void setLastShippedAge(long age);
   void setSizeOfLogQueue(int size);
   void incrSizeOfLogQueue(int size);
@@ -44,4 +47,7 @@ public interface MetricsReplicationSourceSource {
   void incrLogReadInEdits(long size);
   void clear();
   long getLastShippedAge();
+  void incrHFilesShipped(long hfiles);
+  void incrSizeOfHFileRefsQueue(long size);
+  void decrSizeOfHFileRefsQueue(long size);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 6dace10..392cd39 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -32,6 +32,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   private final MutableCounterLong shippedOpsCounter;
   private final MutableCounterLong shippedKBsCounter;
   private final MutableCounterLong logReadInBytesCounter;
+  private final MutableCounterLong shippedHFilesCounter;
+  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
 
   public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
     this.rms = rms;
@@ -51,6 +53,11 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
     logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
 
     logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
+
+    shippedHFilesCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_SHIPPED_HFILES, 0L);
+
+    sizeOfHFileRefsQueueGauge =
+        rms.getMetricsRegistry().getLongGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -100,4 +107,18 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public long getLastShippedAge() {
     return ageOfLastShippedOpGauge.value();
   }
+
+  @Override public void incrHFilesShipped(long hfiles) {
+    shippedHFilesCounter.incr(hfiles);
+  }
+
+  @Override
+  public void incrSizeOfHFileRefsQueue(long size) {
+    sizeOfHFileRefsQueueGauge.incr(size);
+  }
+
+  @Override
+  public void decrSizeOfHFileRefsQueue(long size) {
+    sizeOfHFileRefsQueueGauge.decr(size);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
index 14212ba..8f4a337 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
@@ -26,11 +26,13 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
   private final MutableGaugeLong ageGauge;
   private final MutableCounterLong batchesCounter;
   private final MutableCounterLong opsCounter;
+  private final MutableCounterLong hfilesCounter;
 
   public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
     ageGauge = rms.getMetricsRegistry().getLongGauge(SINK_AGE_OF_LAST_APPLIED_OP, 0L);
     batchesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_BATCHES, 0L);
     opsCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_OPS, 0L);
+    hfilesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_HFILES, 0L);
   }
 
   @Override public void setLastAppliedOpAge(long age) {
@@ -49,4 +51,9 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
   public long getLastAppliedOpAge() {
     return ageGauge.value();
   }
+
+  @Override
+  public void incrAppliedHFiles(long hfiles) {
+    hfilesCounter.incr(hfiles);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 1422e7e..217cc3e 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -32,6 +32,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String shippedOpsKey;
   private final String shippedKBsKey;
   private final String logReadInBytesKey;
+  private final String shippedHFilesKey;
+  private final String sizeOfHFileRefsQueueKey;
 
   private final MutableGaugeLong ageOfLastShippedOpGauge;
   private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -41,6 +43,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final MutableCounterLong shippedOpsCounter;
   private final MutableCounterLong shippedKBsCounter;
   private final MutableCounterLong logReadInBytesCounter;
+  private final MutableCounterLong shippedHFilesCounter;
+  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
 
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
     this.rms = rms;
@@ -69,6 +73,12 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
     logEditsFilteredKey = "source." + id + ".logEditsFiltered";
     logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L);
+
+    shippedHFilesKey = "source." + this.id + ".shippedHFiles";
+    shippedHFilesCounter = rms.getMetricsRegistry().getLongCounter(shippedHFilesKey, 0L);
+
+    sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
+    sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getLongGauge(sizeOfHFileRefsQueueKey, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -124,10 +134,28 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
     rms.removeMetric(logReadInEditsKey);
 
     rms.removeMetric(logEditsFilteredKey);
+
+    rms.removeMetric(shippedHFilesKey);
+    rms.removeMetric(sizeOfHFileRefsQueueKey);
   }
 
   @Override
   public long getLastShippedAge() {
     return ageOfLastShippedOpGauge.value();
   }
+
+  @Override
+  public void incrHFilesShipped(long hfiles) {
+    shippedHFilesCounter.incr(hfiles);
+  }
+
+  @Override
+  public void incrSizeOfHFileRefsQueue(long size) {
+    sizeOfHFileRefsQueueGauge.incr(size);
+  }
+
+  @Override
+  public void decrSizeOfHFileRefsQueue(long size) {
+    sizeOfHFileRefsQueueGauge.decr(size);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
index b4c378b..1c59ea6 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
@@ -16896,6 +16896,51 @@ public final class AdminProtos {
      */
     org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntryOrBuilder getEntryOrBuilder(
         int index);
+
+    // optional string replicationClusterId = 2;
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    boolean hasReplicationClusterId();
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    java.lang.String getReplicationClusterId();
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getReplicationClusterIdBytes();
+
+    // optional string sourceBaseNamespaceDirPath = 3;
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    boolean hasSourceBaseNamespaceDirPath();
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    java.lang.String getSourceBaseNamespaceDirPath();
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    com.google.protobuf.ByteString
+        getSourceBaseNamespaceDirPathBytes();
+
+    // optional string sourceHFileArchiveDirPath = 4;
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    boolean hasSourceHFileArchiveDirPath();
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    java.lang.String getSourceHFileArchiveDirPath();
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    com.google.protobuf.ByteString
+        getSourceHFileArchiveDirPathBytes();
   }
   /**
    * Protobuf type {@code hbase.pb.ReplicateWALEntryRequest}
@@ -16963,6 +17008,21 @@ public final class AdminProtos {
               entry_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.PARSER, extensionRegistry));
               break;
             }
+            case 18: {
+              bitField0_ |= 0x00000001;
+              replicationClusterId_ = input.readBytes();
+              break;
+            }
+            case 26: {
+              bitField0_ |= 0x00000002;
+              sourceBaseNamespaceDirPath_ = input.readBytes();
+              break;
+            }
+            case 34: {
+              bitField0_ |= 0x00000004;
+              sourceHFileArchiveDirPath_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -17005,6 +17065,7 @@ public final class AdminProtos {
       return PARSER;
     }
 
+    private int bitField0_;
     // repeated .hbase.pb.WALEntry entry = 1;
     public static final int ENTRY_FIELD_NUMBER = 1;
     private java.util.List<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry> entry_;
@@ -17041,8 +17102,140 @@ public final class AdminProtos {
       return entry_.get(index);
     }
 
+    // optional string replicationClusterId = 2;
+    public static final int REPLICATIONCLUSTERID_FIELD_NUMBER = 2;
+    private java.lang.Object replicationClusterId_;
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    public boolean hasReplicationClusterId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    public java.lang.String getReplicationClusterId() {
+      java.lang.Object ref = replicationClusterId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          replicationClusterId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getReplicationClusterIdBytes() {
+      java.lang.Object ref = replicationClusterId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        replicationClusterId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // optional string sourceBaseNamespaceDirPath = 3;
+    public static final int SOURCEBASENAMESPACEDIRPATH_FIELD_NUMBER = 3;
+    private java.lang.Object sourceBaseNamespaceDirPath_;
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    public boolean hasSourceBaseNamespaceDirPath() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    public java.lang.String getSourceBaseNamespaceDirPath() {
+      java.lang.Object ref = sourceBaseNamespaceDirPath_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          sourceBaseNamespaceDirPath_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    public com.google.protobuf.ByteString
+        getSourceBaseNamespaceDirPathBytes() {
+      java.lang.Object ref = sourceBaseNamespaceDirPath_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        sourceBaseNamespaceDirPath_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // optional string sourceHFileArchiveDirPath = 4;
+    public static final int SOURCEHFILEARCHIVEDIRPATH_FIELD_NUMBER = 4;
+    private java.lang.Object sourceHFileArchiveDirPath_;
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    public boolean hasSourceHFileArchiveDirPath() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    public java.lang.String getSourceHFileArchiveDirPath() {
+      java.lang.Object ref = sourceHFileArchiveDirPath_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          sourceHFileArchiveDirPath_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    public com.google.protobuf.ByteString
+        getSourceHFileArchiveDirPathBytes() {
+      java.lang.Object ref = sourceHFileArchiveDirPath_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        sourceHFileArchiveDirPath_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       entry_ = java.util.Collections.emptyList();
+      replicationClusterId_ = "";
+      sourceBaseNamespaceDirPath_ = "";
+      sourceHFileArchiveDirPath_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -17065,6 +17258,15 @@ public final class AdminProtos {
       for (int i = 0; i < entry_.size(); i++) {
         output.writeMessage(1, entry_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(2, getReplicationClusterIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(3, getSourceBaseNamespaceDirPathBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(4, getSourceHFileArchiveDirPathBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -17078,6 +17280,18 @@ public final class AdminProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(1, entry_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getReplicationClusterIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, getSourceBaseNamespaceDirPathBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, getSourceHFileArchiveDirPathBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -17103,6 +17317,21 @@ public final class AdminProtos {
       boolean result = true;
       result = result && getEntryList()
           .equals(other.getEntryList());
+      result = result && (hasReplicationClusterId() == other.hasReplicationClusterId());
+      if (hasReplicationClusterId()) {
+        result = result && getReplicationClusterId()
+            .equals(other.getReplicationClusterId());
+      }
+      result = result && (hasSourceBaseNamespaceDirPath() == other.hasSourceBaseNamespaceDirPath());
+      if (hasSourceBaseNamespaceDirPath()) {
+        result = result && getSourceBaseNamespaceDirPath()
+            .equals(other.getSourceBaseNamespaceDirPath());
+      }
+      result = result && (hasSourceHFileArchiveDirPath() == other.hasSourceHFileArchiveDirPath());
+      if (hasSourceHFileArchiveDirPath()) {
+        result = result && getSourceHFileArchiveDirPath()
+            .equals(other.getSourceHFileArchiveDirPath());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -17120,6 +17349,18 @@ public final class AdminProtos {
         hash = (37 * hash) + ENTRY_FIELD_NUMBER;
         hash = (53 * hash) + getEntryList().hashCode();
       }
+      if (hasReplicationClusterId()) {
+        hash = (37 * hash) + REPLICATIONCLUSTERID_FIELD_NUMBER;
+        hash = (53 * hash) + getReplicationClusterId().hashCode();
+      }
+      if (hasSourceBaseNamespaceDirPath()) {
+        hash = (37 * hash) + SOURCEBASENAMESPACEDIRPATH_FIELD_NUMBER;
+        hash = (53 * hash) + getSourceBaseNamespaceDirPath().hashCode();
+      }
+      if (hasSourceHFileArchiveDirPath()) {
+        hash = (37 * hash) + SOURCEHFILEARCHIVEDIRPATH_FIELD_NUMBER;
+        hash = (53 * hash) + getSourceHFileArchiveDirPath().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -17243,6 +17484,12 @@ public final class AdminProtos {
         } else {
           entryBuilder_.clear();
         }
+        replicationClusterId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        sourceBaseNamespaceDirPath_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
+        sourceHFileArchiveDirPath_ = "";
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -17270,6 +17517,7 @@ public final class AdminProtos {
       public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest buildPartial() {
         org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest result = new org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest(this);
         int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
         if (entryBuilder_ == null) {
           if (((bitField0_ & 0x00000001) == 0x00000001)) {
             entry_ = java.util.Collections.unmodifiableList(entry_);
@@ -17279,6 +17527,19 @@ public final class AdminProtos {
         } else {
           result.entry_ = entryBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.replicationClusterId_ = replicationClusterId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.sourceBaseNamespaceDirPath_ = sourceBaseNamespaceDirPath_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.sourceHFileArchiveDirPath_ = sourceHFileArchiveDirPath_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -17320,6 +17581,21 @@ public final class AdminProtos {
             }
           }
         }
+        if (other.hasReplicationClusterId()) {
+          bitField0_ |= 0x00000002;
+          replicationClusterId_ = other.replicationClusterId_;
+          onChanged();
+        }
+        if (other.hasSourceBaseNamespaceDirPath()) {
+          bitField0_ |= 0x00000004;
+          sourceBaseNamespaceDirPath_ = other.sourceBaseNamespaceDirPath_;
+          onChanged();
+        }
+        if (other.hasSourceHFileArchiveDirPath()) {
+          bitField0_ |= 0x00000008;
+          sourceHFileArchiveDirPath_ = other.sourceHFileArchiveDirPath_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -17593,6 +17869,228 @@ public final class AdminProtos {
         return entryBuilder_;
       }
 
+      // optional string replicationClusterId = 2;
+      private java.lang.Object replicationClusterId_ = "";
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public boolean hasReplicationClusterId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public java.lang.String getReplicationClusterId() {
+        java.lang.Object ref = replicationClusterId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          replicationClusterId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getReplicationClusterIdBytes() {
+        java.lang.Object ref = replicationClusterId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          replicationClusterId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public Builder setReplicationClusterId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        replicationClusterId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public Builder clearReplicationClusterId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        replicationClusterId_ = getDefaultInstance().getReplicationClusterId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public Builder setReplicationClusterIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        replicationClusterId_ = value;
+        onChanged();
+        return this;
+      }
+
+      // optional string sourceBaseNamespaceDirPath = 3;
+      private java.lang.Object sourceBaseNamespaceDirPath_ = "";
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public boolean hasSourceBaseNamespaceDirPath() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public java.lang.String getSourceBaseNamespaceDirPath() {
+        java.lang.Object ref = sourceBaseNamespaceDirPath_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          sourceBaseNamespaceDirPath_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public com.google.protobuf.ByteString
+          getSourceBaseNamespaceDirPathBytes() {
+        java.lang.Object ref = sourceBaseNamespaceDirPath_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          sourceBaseNamespaceDirPath_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public Builder setSourceBaseNamespaceDirPath(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        sourceBaseNamespaceDirPath_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public Builder clearSourceBaseNamespaceDirPath() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        sourceBaseNamespaceDirPath_ = getDefaultInstance().getSourceBaseNamespaceDirPath();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public Builder setSourceBaseNamespaceDirPathBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        sourceBaseNamespaceDirPath_ = value;
+        onChanged();
+        return this;
+      }
+
+      // optional string sourceHFileArchiveDirPath = 4;
+      private java.lang.Object sourceHFileArchiveDirPath_ = "";
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public boolean hasSourceHFileArchiveDirPath() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public java.lang.String getSourceHFileArchiveDirPath() {
+        java.lang.Object ref = sourceHFileArchiveDirPath_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          sourceHFileArchiveDirPath_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public com.google.protobuf.ByteString
+          getSourceHFileArchiveDirPathBytes() {
+        java.lang.Object ref = sourceHFileArchiveDirPath_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          sourceHFileArchiveDirPath_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public Builder setSourceHFileArchiveDirPath(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        sourceHFileArchiveDirPath_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public Builder clearSourceHFileArchiveDirPath() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        sourceHFileArchiveDirPath_ = getDefaultInstance().getSourceHFileArchiveDirPath();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public Builder setSourceHFileArchiveDirPathBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        sourceHFileArchiveDirPath_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicateWALEntryRequest)
     }
 
@@ -23539,56 +24037,58 @@ public final class AdminProtos {
       "ster_system_time\030\004 \001(\004\"\026\n\024MergeRegionsRe" +
       "sponse\"a\n\010WALEntry\022\035\n\003key\030\001 \002(\0132\020.hbase." +
       "pb.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as",
-      "sociated_cell_count\030\003 \001(\005\"=\n\030ReplicateWA" +
-      "LEntryRequest\022!\n\005entry\030\001 \003(\0132\022.hbase.pb." +
-      "WALEntry\"\033\n\031ReplicateWALEntryResponse\"\026\n" +
-      "\024RollWALWriterRequest\"0\n\025RollWALWriterRe" +
-      "sponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopS" +
-      "erverRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServ" +
-      "erResponse\"\026\n\024GetServerInfoRequest\"K\n\nSe" +
-      "rverInfo\022)\n\013server_name\030\001 \002(\0132\024.hbase.pb" +
-      ".ServerName\022\022\n\nwebui_port\030\002 \001(\r\"B\n\025GetSe" +
-      "rverInfoResponse\022)\n\013server_info\030\001 \002(\0132\024.",
-      "hbase.pb.ServerInfo\"\034\n\032UpdateConfigurati" +
-      "onRequest\"\035\n\033UpdateConfigurationResponse" +
-      "2\207\013\n\014AdminService\022P\n\rGetRegionInfo\022\036.hba" +
-      "se.pb.GetRegionInfoRequest\032\037.hbase.pb.Ge" +
-      "tRegionInfoResponse\022M\n\014GetStoreFile\022\035.hb" +
-      "ase.pb.GetStoreFileRequest\032\036.hbase.pb.Ge" +
-      "tStoreFileResponse\022V\n\017GetOnlineRegion\022 ." +
-      "hbase.pb.GetOnlineRegionRequest\032!.hbase." +
-      "pb.GetOnlineRegionResponse\022G\n\nOpenRegion" +
-      "\022\033.hbase.pb.OpenRegionRequest\032\034.hbase.pb",
-      ".OpenRegionResponse\022M\n\014WarmupRegion\022\035.hb" +
-      "ase.pb.WarmupRegionRequest\032\036.hbase.pb.Wa" +
-      "rmupRegionResponse\022J\n\013CloseRegion\022\034.hbas" +
-      "e.pb.CloseRegionRequest\032\035.hbase.pb.Close" +
-      "RegionResponse\022J\n\013FlushRegion\022\034.hbase.pb" +
-      ".FlushRegionRequest\032\035.hbase.pb.FlushRegi" +
-      "onResponse\022J\n\013SplitRegion\022\034.hbase.pb.Spl" +
-      "itRegionRequest\032\035.hbase.pb.SplitRegionRe" +
-      "sponse\022P\n\rCompactRegion\022\036.hbase.pb.Compa" +
-      "ctRegionRequest\032\037.hbase.pb.CompactRegion",
-      "Response\022M\n\014MergeRegions\022\035.hbase.pb.Merg" +
-      "eRegionsRequest\032\036.hbase.pb.MergeRegionsR" +
-      "esponse\022\\\n\021ReplicateWALEntry\022\".hbase.pb." +
-      "ReplicateWALEntryRequest\032#.hbase.pb.Repl" +
-      "icateWALEntryResponse\022Q\n\006Replay\022\".hbase." +
-      "pb.ReplicateWALEntryRequest\032#.hbase.pb.R" +
-      "eplicateWALEntryResponse\022P\n\rRollWALWrite" +
-      "r\022\036.hbase.pb.RollWALWriterRequest\032\037.hbas" +
-      "e.pb.RollWALWriterResponse\022P\n\rGetServerI" +
-      "nfo\022\036.hbase.pb.GetServerInfoRequest\032\037.hb",
-      "ase.pb.GetServerInfoResponse\022G\n\nStopServ" +
-      "er\022\033.hbase.pb.StopServerRequest\032\034.hbase." +
-      "pb.StopServerResponse\022_\n\022UpdateFavoredNo" +
-      "des\022#.hbase.pb.UpdateFavoredNodesRequest" +
-      "\032$.hbase.pb.UpdateFavoredNodesResponse\022b" +
-      "\n\023UpdateConfiguration\022$.hbase.pb.UpdateC" +
-      "onfigurationRequest\032%.hbase.pb.UpdateCon" +
-      "figurationResponseBA\n*org.apache.hadoop." +
-      "hbase.protobuf.generatedB\013AdminProtosH\001\210" +
-      "\001\001\240\001\001"
+      "sociated_cell_count\030\003 \001(\005\"\242\001\n\030ReplicateW" +
+      "ALEntryRequest\022!\n\005entry\030\001 \003(\0132\022.hbase.pb" +
+      ".WALEntry\022\034\n\024replicationClusterId\030\002 \001(\t\022" +
+      "\"\n\032sourceBaseNamespaceDirPath\030\003 \001(\t\022!\n\031s" +
+      "ourceHFileArchiveDirPath\030\004 \001(\t\"\033\n\031Replic" +
+      "ateWALEntryResponse\"\026\n\024RollWALWriterRequ" +
+      "est\"0\n\025RollWALWriterResponse\022\027\n\017region_t" +
+      "o_flush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006re" +
+      "ason\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetS" +
+      "erverInfoRequest\"K\n\nServerInfo\022)\n\013server",
+      "_name\030\001 \002(\0132\024.hbase.pb.ServerName\022\022\n\nweb" +
+      "ui_port\030\002 \001(\r\"B\n\025GetServerInfoResponse\022)" +
+      "\n\013server_info\030\001 \002(\0132\024.hbase.pb.ServerInf" +
+      "o\"\034\n\032UpdateConfigurationRequest\"\035\n\033Updat" +
+      "eConfigurationResponse2\207\013\n\014AdminService\022" +
+      "P\n\rGetRegionInfo\022\036.hbase.pb.GetRegionInf" +
+      "oRequest\032\037.hbase.pb.GetRegionInfoRespons" +
+      "e\022M\n\014GetStoreFile\022\035.hbase.pb.GetStoreFil" +
+      "eRequest\032\036.hbase.pb.GetStoreFileResponse" +
+      "\022V\n\017GetOnlineRegion\022 .hbase.pb.GetOnline",
+      "RegionRequest\032!.hbase.pb.GetOnlineRegion" +
+      "Response\022G\n\nOpenRegion\022\033.hbase.pb.OpenRe" +
+      "gionRequest\032\034.hbase.pb.OpenRegionRespons" +
+      "e\022M\n\014WarmupRegion\022\035.hbase.pb.WarmupRegio" +
+      "nRequest\032\036.hbase.pb.WarmupRegionResponse" +
+      "\022J\n\013CloseRegion\022\034.hbase.pb.CloseRegionRe" +
+      "quest\032\035.hbase.pb.CloseRegionResponse\022J\n\013" +
+      "FlushRegion\022\034.hbase.pb.FlushRegionReques" +
+      "t\032\035.hbase.pb.FlushRegionResponse\022J\n\013Spli" +
+      "tRegion\022\034.hbase.pb.SplitRegionRequest\032\035.",
+      "hbase.pb.SplitRegionResponse\022P\n\rCompactR" +
+      "egion\022\036.hbase.pb.CompactRegionRequest\032\037." +
+      "hbase.pb.CompactRegionResponse\022M\n\014MergeR" +
+      "egions\022\035.hbase.pb.MergeRegionsRequest\032\036." +
+      "hbase.pb.MergeRegionsResponse\022\\\n\021Replica" +
+      "teWALEntry\022\".hbase.pb.ReplicateWALEntryR" +
+      "equest\032#.hbase.pb.ReplicateWALEntryRespo" +
+      "nse\022Q\n\006Replay\022\".hbase.pb.ReplicateWALEnt" +
+      "ryRequest\032#.hbase.pb.ReplicateWALEntryRe" +
+      "sponse\022P\n\rRollWALWriter\022\036.hbase.pb.RollW",
+      "ALWriterRequest\032\037.hbase.pb.RollWALWriter" +
+      "Response\022P\n\rGetServerInfo\022\036.hbase.pb.Get" +
+      "ServerInfoRequest\032\037.hbase.pb.GetServerIn" +
+      "foResponse\022G\n\nStopServer\022\033.hbase.pb.Stop" +
+      "ServerRequest\032\034.hbase.pb.StopServerRespo" +
+      "nse\022_\n\022UpdateFavoredNodes\022#.hbase.pb.Upd" +
+      "ateFavoredNodesRequest\032$.hbase.pb.Update" +
+      "FavoredNodesResponse\022b\n\023UpdateConfigurat" +
+      "ion\022$.hbase.pb.UpdateConfigurationReques" +
+      "t\032%.hbase.pb.UpdateConfigurationResponse",
+      "BA\n*org.apache.hadoop.hbase.protobuf.gen" +
+      "eratedB\013AdminProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -23750,7 +24250,7 @@ public final class AdminProtos {
           internal_static_hbase_pb_ReplicateWALEntryRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_ReplicateWALEntryRequest_descriptor,
-              new java.lang.String[] { "Entry", });
+              new java.lang.String[] { "Entry", "ReplicationClusterId", "SourceBaseNamespaceDirPath", "SourceHFileArchiveDirPath", });
           internal_static_hbase_pb_ReplicateWALEntryResponse_descriptor =
             getDescriptor().getMessageTypes().get(24);
           internal_static_hbase_pb_ReplicateWALEntryResponse_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-protocol/src/main/protobuf/Admin.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto
index f7787f5..a1905a4 100644
--- a/hbase-protocol/src/main/protobuf/Admin.proto
+++ b/hbase-protocol/src/main/protobuf/Admin.proto
@@ -211,6 +211,9 @@ message WALEntry {
  */
 message ReplicateWALEntryRequest {
   repeated WALEntry entry = 1;
+  optional string replicationClusterId = 2;
+  optional string sourceBaseNamespaceDirPath = 3;
+  optional string sourceHFileArchiveDirPath = 4;
 }
 
 message ReplicateWALEntryResponse {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 44be2d3..369ae90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSHDFSUtils;
@@ -125,6 +126,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private FsDelegationToken fsDelegationToken;
   private String bulkToken;
   private UserProvider userProvider;
+  private int nrThreads;
 
   private LoadIncrementalHFiles() {}
 
@@ -146,6 +148,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
     assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
     maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
+    nrThreads = conf.getInt("hbase.loadincremental.threads.max",
+      Runtime.getRuntime().availableProcessors());
     initalized = true;
   }
 
@@ -246,7 +250,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * region boundary, and each part is added back into the queue.
    * The import process finishes when the queue is empty.
    */
-  static class LoadQueueItem {
+  public static class LoadQueueItem {
     final byte[] family;
     final Path hfilePath;
 
@@ -313,7 +317,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * @param table the table to load into
    * @throws TableNotFoundException if table does not yet exist
    */
-  @SuppressWarnings("deprecation")
   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
       RegionLocator regionLocator) throws TableNotFoundException, IOException  {
 
@@ -321,16 +324,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       throw new TableNotFoundException("Table " + table.getName() + "is not currently available.");
     }
 
-    // initialize thread pools
-    int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
-      Runtime.getRuntime().availableProcessors());
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    builder.setNameFormat("LoadIncrementalHFiles-%1$d");
-    ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
-        60, TimeUnit.SECONDS,
-        new LinkedBlockingQueue<Runnable>(),
-        builder.build());
-    ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
+    ExecutorService pool = createExecutorService();
 
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
@@ -347,30 +341,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 	    "option, consider removing the files and bulkload again without this option. " +
 	    "See HBASE-13985");
       }
-      discoverLoadQueue(queue, hfofDir, validateHFile);
-      // check whether there is invalid family name in HFiles to be bulkloaded
-      Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
-      ArrayList<String> familyNames = new ArrayList<String>(families.size());
-      for (HColumnDescriptor family : families) {
-        familyNames.add(family.getNameAsString());
-      }
-      ArrayList<String> unmatchedFamilies = new ArrayList<String>();
-      Iterator<LoadQueueItem> queueIter = queue.iterator();
-      while (queueIter.hasNext()) {
-        LoadQueueItem lqi = queueIter.next();
-        String familyNameInHFile = Bytes.toString(lqi.family);
-        if (!familyNames.contains(familyNameInHFile)) {
-          unmatchedFamilies.add(familyNameInHFile);
-        }
-      }
-      if (unmatchedFamilies.size() > 0) {
-        String msg =
-            "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
-                + unmatchedFamilies + "; valid family names of table "
-                + table.getName() + " are: " + familyNames;
-        LOG.error(msg);
-        throw new IOException(msg);
-      }
+      prepareHFileQueue(hfofDir, table, queue, validateHFile);
+
       int count = 0;
 
       if (queue.isEmpty()) {
@@ -397,7 +369,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
               + count + " with " + queue.size() + " files remaining to group or split");
         }
 
-        int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10);
+        int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
         maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
         if (maxRetries != 0 && count >= maxRetries) {
           throw new IOException("Retry attempted " + count +
@@ -447,6 +419,85 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   }
 
   /**
+   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+   * passed directory and validates whether the prepared queue has all the valid table column
+   * families in it.
+   * @param hfilesDir directory containing list of hfiles to be loaded into the table
+   * @param table table to which hfiles should be loaded
+   * @param queue queue which needs to be loaded into the table
+   * @throws IOException If any I/O or network error occurred
+   */
+  public void prepareHFileQueue(Path hfofDir, Table table, Deque<LoadQueueItem> queue,
+      boolean validateHFile) throws IOException {
+    discoverLoadQueue(queue, hfofDir, validateHFile);
+    validateFamiliesInHFiles(table, queue);
+  }
+
+  // Initialize a thread pool
+  private ExecutorService createExecutorService() {
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("LoadIncrementalHFiles-%1$d");
+    ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(), builder.build());
+    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+    return pool;
+  }
+
+  /**
+   * Checks whether there is any invalid family name in HFiles to be bulk loaded.
+   */
+  private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue)
+      throws IOException {
+    Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
+    List<String> familyNames = new ArrayList<String>(families.size());
+    for (HColumnDescriptor family : families) {
+      familyNames.add(family.getNameAsString());
+    }
+    List<String> unmatchedFamilies = new ArrayList<String>();
+    Iterator<LoadQueueItem> queueIter = queue.iterator();
+    while (queueIter.hasNext()) {
+      LoadQueueItem lqi = queueIter.next();
+      String familyNameInHFile = Bytes.toString(lqi.family);
+      if (!familyNames.contains(familyNameInHFile)) {
+        unmatchedFamilies.add(familyNameInHFile);
+      }
+    }
+    if (unmatchedFamilies.size() > 0) {
+      String msg =
+          "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
+              + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
+              + familyNames;
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+  }
+
+  /**
+   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+   * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
+   * {@link
+   * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
+   * @param table Table to which these hfiles should be loaded to
+   * @param conn Connection to use
+   * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
+   * @param startEndKeys starting and ending row keys of the region
+   */
+  public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
+      Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+    ExecutorService pool = null;
+    try {
+      pool = createExecutorService();
+      Multimap<ByteBuffer, LoadQueueItem> regionGroups =
+          groupOrSplitPhase(table, pool, queue, startEndKeys);
+      bulkLoadPhase(table, conn, pool, queue, regionGroups);
+    } finally {
+      if (pool != null) {
+        pool.shutdown();
+      }
+    }
+  }
+
+  /**
    * This takes the LQI's grouped by likely regions and attempts to bulk load
    * them.  Any failures are re-queued for another pass with the
    * groupOrSplitPhase.
@@ -592,10 +643,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
     String uniqueName = getUniqueName();
     HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
+
     Path botOut = new Path(tmpDir, uniqueName + ".bottom");
     Path topOut = new Path(tmpDir, uniqueName + ".top");
-    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
-        botOut, topOut);
+    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
 
     FileSystem fs = tmpDir.getFileSystem(getConf());
     fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
@@ -626,6 +677,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       final Pair<byte[][], byte[][]> startEndKeys)
       throws IOException {
     final Path hfilePath = item.hfilePath;
+    // fs is the source filesystem
+    if (fs == null) {
+      fs = hfilePath.getFileSystem(getConf());
+    }
     HFile.Reader hfr = HFile.createReader(fs, hfilePath,
         new CacheConfig(getConf()), getConf());
     final byte[] first, last;
@@ -712,7 +767,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * failure
    */
   protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
-      final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
+      final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
   throws IOException {
     final List<Pair<byte[], String>> famPaths =
       new ArrayList<Pair<byte[], String>>(lqis.size());
@@ -747,6 +802,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
           //in user directory
           if(secureClient != null && !success) {
             FileSystem targetFs = FileSystem.get(getConf());
+         // fs is the source filesystem
+            if(fs == null) {
+              fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
+            }
             // Check to see if the source and target filesystems are the same
             // If they are the same filesystem, we will try move the files back
             // because previously we moved them to the staging directory.
@@ -1000,4 +1059,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     System.exit(ret);
   }
 
+  /**
+   * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
+   * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
+   * property. This directory is used as a temporary directory where all files are initially
+   * copied/moved from user given directory, set all the required file permissions and then from
+   * their it is finally loaded into a table. This should be set only when, one would like to manage
+   * the staging directory by itself. Otherwise this tool will handle this by itself.
+   * @param stagingDir staging directory path
+   */
+  public void setBulkToken(String stagingDir) {
+    this.bulkToken = stagingDir;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index d6a120b..91185af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -28,22 +28,23 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.UUID;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
 
 import com.google.protobuf.ServiceException;
 
@@ -51,15 +52,20 @@ import com.google.protobuf.ServiceException;
 public class ReplicationProtbufUtil {
   /**
    * A helper to replicate a list of WAL entries using admin protocol.
-   *
-   * @param admin
-   * @param entries
+   * @param admin Admin service
+   * @param entries Array of WAL entries to be replicated
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+   * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
    * @throws java.io.IOException
    */
   public static void replicateWALEntry(final AdminService.BlockingInterface admin,
-      final Entry[] entries) throws IOException {
+      final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
+      Path sourceHFileArchiveDir) throws IOException {
     Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
-      buildReplicateWALEntryRequest(entries, null);
+        buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
+          sourceHFileArchiveDir);
     PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
     try {
       admin.replicateWALEntry(controller, p.getFirst());
@@ -77,19 +83,22 @@ public class ReplicationProtbufUtil {
    */
   public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
       buildReplicateWALEntryRequest(final Entry[] entries) {
-    return buildReplicateWALEntryRequest(entries, null);
+    return buildReplicateWALEntryRequest(entries, null, null, null, null);
   }
 
   /**
    * Create a new ReplicateWALEntryRequest from a list of WAL entries
-   *
    * @param entries the WAL entries to be replicated
    * @param encodedRegionName alternative region name to use if not null
-   * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
-   * found.
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+   * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
+   * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
    */
   public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
-      buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) {
+      buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
+          String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
     // Accumulate all the Cells seen in here.
     List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
     int size = 0;
@@ -146,6 +155,17 @@ public class ReplicationProtbufUtil {
       entryBuilder.setAssociatedCellCount(cells.size());
       builder.addEntry(entryBuilder.build());
     }
+
+    if (replicationClusterId != null) {
+      builder.setReplicationClusterId(replicationClusterId);
+    }
+    if (sourceBaseNamespaceDir != null) {
+      builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString());
+    }
+    if (sourceHFileArchiveDir != null) {
+      builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString());
+    }
+
     return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
       getCellScanner(allCells, size));
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index d94e11c..0c9b0e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1800,7 +1800,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         List<WALEntry> entries = request.getEntryList();
         CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
-        regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
+        regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
+          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
+          request.getSourceHFileArchiveDirPath());
         regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
         return ReplicateWALEntryResponse.newBuilder().build();
       } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
index 5f96bf7..836d3aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
@@ -36,7 +36,13 @@ public interface ReplicationSinkService extends ReplicationService {
    * Carry on the list of log entries down to the sink
    * @param entries list of WALEntries to replicate
    * @param cells Cells that the WALEntries refer to (if cells is non-null)
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+   *          directory required for replicating hfiles
+   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
    * @throws IOException
    */
-  void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException;
+  void replicateLogEntries(List<WALEntry> entries, CellScanner cells, String replicationClusterId,
+      String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException;
 }


Mime
View raw message