hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject hbase git commit: HBASE-16868 Add a replicate_all flag to avoid misuse the namespaces and table-cfs config of replication peer
Date Thu, 23 Nov 2017 07:17:24 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 135bb5583 -> ca6e7e68f


HBASE-16868 Add a replicate_all flag to avoid misuse the namespaces and table-cfs config of replication peer

Signed-off-by: Guanghao Zhang <zghao@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca6e7e68
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca6e7e68
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca6e7e68

Branch: refs/heads/branch-2
Commit: ca6e7e68f4468f0c4c8e3abf1922e5ed07262cae
Parents: 135bb55
Author: Guanghao Zhang <zghao@apache.org>
Authored: Thu Nov 23 14:54:19 2017 +0800
Committer: Guanghao Zhang <zghao@apache.org>
Committed: Thu Nov 23 15:08:03 2017 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerConfigUtil.java  |  10 +-
 .../replication/ReplicationPeerConfig.java      |  12 ++
 .../src/main/protobuf/Replication.proto         |   1 +
 .../replication/ReplicationPeersZKImpl.java     |   1 +
 .../org/apache/hadoop/hbase/master/HMaster.java |   6 +-
 .../master/replication/ReplicationManager.java  |  25 ++-
 .../NamespaceTableCfWALEntryFilter.java         | 103 ++++++-----
 .../master/ReplicationPeerConfigUpgrader.java   | 184 +++++++++++++++++++
 .../replication/master/TableCFsUpdater.java     | 151 ---------------
 .../client/TestAsyncReplicationAdminApi.java    |   9 +
 ...estAsyncReplicationAdminApiWithClusters.java |   2 +
 .../replication/TestReplicationAdmin.java       | 116 ++++++++++--
 .../TestReplicationAdminWithClusters.java       |  12 +-
 .../replication/TestMasterReplication.java      |   8 +-
 .../replication/TestNamespaceReplication.java   |   6 +-
 .../replication/TestPerTableCFReplication.java  |   4 +-
 .../TestReplicationWALEntryFilters.java         |  69 +++++--
 .../replication/master/TestTableCFsUpdater.java |   4 +-
 .../src/main/ruby/hbase/replication_admin.rb    |   9 +
 hbase-shell/src/main/ruby/shell.rb              |   1 +
 .../src/main/ruby/shell/commands/list_peers.rb  |   5 +-
 .../shell/commands/set_peer_replicate_all.rb    |  54 ++++++
 .../test/ruby/hbase/replication_admin_test.rb   |  95 +++++++---
 23 files changed, 602 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index be468ae..52a3c93 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -286,6 +286,7 @@ public final class ReplicationPeerConfigUtil {
     if (tableCFsMap != null) {
       peerConfig.setTableCFsMap(tableCFsMap);
     }
+
     List<ByteString> namespacesList = peer.getNamespacesList();
     if (namespacesList != null && namespacesList.size() != 0) {
       Set<String> namespaces = new HashSet<>();
@@ -294,9 +295,15 @@ public final class ReplicationPeerConfigUtil {
       }
       peerConfig.setNamespaces(namespaces);
     }
+
     if (peer.hasBandwidth()) {
       peerConfig.setBandwidth(peer.getBandwidth());
     }
+
+    if (peer.hasReplicateAll()) {
+      peerConfig.setReplicateAllUserTables(peer.getReplicateAll());
+    }
+
     return peerConfig;
   }
 
@@ -338,6 +345,7 @@ public final class ReplicationPeerConfigUtil {
     }
 
     builder.setBandwidth(peerConfig.getBandwidth());
+    builder.setReplicateAll(peerConfig.replicateAllUserTables());
     return builder.build();
   }
 
@@ -465,4 +473,4 @@ public final class ReplicationPeerConfigUtil {
 
     return otherConf;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 4d429c9..9e20829 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -42,6 +42,8 @@ public class ReplicationPeerConfig {
   private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
   private Set<String> namespaces = null;
   private long bandwidth = 0;
+  // Default value is true, means replicate all user tables to peer cluster.
+  private boolean replicateAllUserTables = true;
 
   public ReplicationPeerConfig() {
     this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -110,10 +112,20 @@ public class ReplicationPeerConfig {
     return this;
   }
 
+  public boolean replicateAllUserTables() {
+    return this.replicateAllUserTables;
+  }
+
+  public ReplicationPeerConfig setReplicateAllUserTables(boolean replicateAllUserTables) {
+    this.replicateAllUserTables = replicateAllUserTables;
+    return this;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
     builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
+    builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(",");
     if (namespaces != null) {
       builder.append("namespaces=").append(namespaces.toString()).append(",");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-protocol-shaded/src/main/protobuf/Replication.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
index 88efa00..a1a7ade 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
@@ -45,6 +45,7 @@ message ReplicationPeer {
   repeated TableCF table_cfs = 5;
   repeated bytes namespaces = 6;
   optional int64 bandwidth = 7;
+  optional bool replicate_all = 8;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index b7564f4..2c3bbd5 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -368,6 +368,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
     existingConfig.setNamespaces(newConfig.getNamespaces());
     existingConfig.setBandwidth(newConfig.getBandwidth());
+    existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables());
 
     try {
       ZKUtil.setData(this.zookeeper, getPeerNode(id),

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 97982b9..fcaf55f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -156,7 +156,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
-import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
+import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -798,9 +798,9 @@ public class HMaster extends HRegionServer implements MasterServices {
     // This is for backwards compatibility
     // See HBASE-11393
     status.setStatus("Update TableCFs node in ZNode");
-    TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zooKeeper,
+    ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zooKeeper,
             conf, this.clusterConnection);
-    tableCFsUpdater.update();
+    tableCFsUpdater.copyTableCFs();
 
     // Add the Observer to delete space quotas on table deletion before starting all CPs by
     // default with quota support, avoiding if user specifically asks to not load this Observer.

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
index 3615992..f2a6c85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
@@ -71,9 +71,7 @@ public class ReplicationManager {
 
   public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
       throws ReplicationException, IOException {
-    checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
-      peerConfig.getTableCFsMap());
-    checkConfiguredWALEntryFilters(peerConfig);
+    checkPeerConfig(peerConfig);
     replicationPeers.registerPeer(peerId, peerConfig, enabled);
     replicationPeers.peerConnected(peerId);
   }
@@ -102,9 +100,7 @@ public class ReplicationManager {
 
   public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
       throws ReplicationException, IOException {
-    checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
-      peerConfig.getTableCFsMap());
-    checkConfiguredWALEntryFilters(peerConfig);
+    checkPeerConfig(peerConfig);
     this.replicationPeers.updatePeerConfig(peerId, peerConfig);
   }
 
@@ -122,6 +118,21 @@ public class ReplicationManager {
     return peers;
   }
 
+  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws ReplicationException,
+      IOException {
+    if (peerConfig.replicateAllUserTables()) {
+      if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
+          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
+        throw new ReplicationException(
+          "Need clean namespaces or table-cfs config fisrtly when you want replicate all cluster");
+      }
+    } else {
+      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
+        peerConfig.getTableCFsMap());
+    }
+    checkConfiguredWALEntryFilters(peerConfig);
+  }
+
   /**
    * Set a namespace in the peer config means that all tables in this namespace
    * will be replicated to the peer cluster.
@@ -150,8 +161,6 @@ public class ReplicationManager {
             "Table-cfs config conflict with namespaces config in peer");
       }
     }
-
-
   }
 
   private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index 5591974..9a4cc6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -58,69 +58,74 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
   public Entry filter(Entry entry) {
     TableName tabName = entry.getKey().getTablename();
     String namespace = tabName.getNamespaceAsString();
-    Set<String> namespaces = this.peer.getNamespaces();
-    Map<TableName, List<String>> tableCFs = getTableCfs();
+    ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
 
-    // If null means user has explicitly not configured any namespaces and table CFs
-    // so all the tables data are applicable for replication
-    if (namespaces == null && tableCFs == null) {
+    if (peerConfig.replicateAllUserTables()) {
+      // replicate all user tables, so return entry directly
       return entry;
-    }
+    } else {
+      // Not replicate all user tables, so filter by namespaces and table-cfs config
+      Set<String> namespaces = peerConfig.getNamespaces();
+      Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
 
-    // First filter by namespaces config
-    // If table's namespace in peer config, all the tables data are applicable for replication
-    if (namespaces != null && namespaces.contains(namespace)) {
-      return entry;
-    }
+      if (namespaces == null && tableCFs == null) {
+        return null;
+      }
 
-    // Then filter by table-cfs config
-    // return null(prevent replicating) if logKey's table isn't in this peer's
-    // replicaable namespace list and table list
-    if (tableCFs == null || !tableCFs.containsKey(tabName)) {
-      return null;
-    }
+      // First filter by namespaces config
+      // If table's namespace in peer config, all the tables data are applicable for replication
+      if (namespaces != null && namespaces.contains(namespace)) {
+        return entry;
+      }
+
+      // Then filter by table-cfs config
+      // return null(prevent replicating) if logKey's table isn't in this peer's
+      // replicaable namespace list and table list
+      if (tableCFs == null || !tableCFs.containsKey(tabName)) {
+        return null;
+      }
 
-    return entry;
+      return entry;
+    }
   }
 
   @Override
   public Cell filterCell(final Entry entry, Cell cell) {
-    final Map<TableName, List<String>> tableCfs = getTableCfs();
-    if (tableCfs == null) return cell;
-    TableName tabName = entry.getKey().getTablename();
-    List<String> cfs = tableCfs.get(tabName);
-    // ignore(remove) kv if its cf isn't in the replicable cf list
-    // (empty cfs means all cfs of this table are replicable)
-    if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
-      cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
-        @Override
-        public boolean apply(byte[] fam) {
-          if (tableCfs != null) {
-            List<String> cfs = tableCfs.get(entry.getKey().getTablename());
-            if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
-              return true;
+    ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
+    if (peerConfig.replicateAllUserTables()) {
+      // replicate all user tables, so return cell directly
+      return cell;
+    } else {
+      final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
+      if (tableCfs == null) {
+        return cell;
+      }
+      TableName tabName = entry.getKey().getTablename();
+      List<String> cfs = tableCfs.get(tabName);
+      // ignore(remove) kv if its cf isn't in the replicable cf list
+      // (empty cfs means all cfs of this table are replicable)
+      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+        cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+          @Override
+          public boolean apply(byte[] fam) {
+            if (tableCfs != null) {
+              List<String> cfs = tableCfs.get(entry.getKey().getTablename());
+              if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
+                return true;
+              }
             }
+            return false;
           }
-          return false;
+        });
+      } else {
+        if ((cfs != null)
+            && !cfs.contains(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(),
+              cell.getFamilyLength()))) {
+          return null;
         }
-      });
-    } else {
-      if ((cfs != null) && !cfs.contains(
-        Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) {
-        return null;
       }
-    }
-    return cell;
-  }
 
-  Map<TableName, List<String>> getTableCfs() {
-    Map<TableName, List<String>> tableCFs = null;
-    try {
-      tableCFs = this.peer.getTableCFs();
-    } catch (IllegalArgumentException e) {
-      LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
-          ", degenerate as if it's not configured by keeping tableCFs==null");
+      return cell;
     }
-    return tableCFs;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
new file mode 100644
index 0000000..5c8fba3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+
+/**
+ * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x.
+ * It will be removed in HBase 3.x. See HBASE-11393
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationPeerConfigUpgrader.class);
+
+  public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper,
+                         Configuration conf, Abortable abortable) {
+    super(zookeeper, conf, abortable);
+  }
+
+  public void upgrade() throws Exception {
+    try (Connection conn = ConnectionFactory.createConnection(conf)) {
+      Admin admin = conn.getAdmin();
+      admin.listReplicationPeers().forEach(
+        (peerDesc) -> {
+          String peerId = peerDesc.getPeerId();
+          ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
+          if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
+              || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
+            peerConfig.setReplicateAllUserTables(false);
+            try {
+              admin.updateReplicationPeerConfig(peerId, peerConfig);
+            } catch (Exception e) {
+              LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
+            }
+          }
+        });
+    }
+  }
+
+  public void copyTableCFs() {
+    List<String> znodes = null;
+    try {
+      znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+    } catch (KeeperException e) {
+      LOG.error("Failed to get peers znode", e);
+    }
+    if (znodes != null) {
+      for (String peerId : znodes) {
+        if (!copyTableCFs(peerId)) {
+          LOG.error("upgrade tableCFs failed for peerId=" + peerId);
+        }
+      }
+    }
+  }
+
+  public boolean copyTableCFs(String peerId) {
+    String tableCFsNode = getTableCFsNode(peerId);
+    try {
+      if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
+        String peerNode = getPeerNode(peerId);
+        ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode);
+        // We only need to copy data from tableCFs node to rpc Node the first time hmaster start.
+        if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) {
+          // we copy TableCFs node into PeerNode
+          LOG.info("copy tableCFs into peerNode:" + peerId);
+          ReplicationProtos.TableCF[] tableCFs =
+                  ReplicationPeerConfigUtil.parseTableCFs(
+                          ZKUtil.getData(this.zookeeper, tableCFsNode));
+          if (tableCFs != null && tableCFs.length > 0) {
+            rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
+            ZKUtil.setData(this.zookeeper, peerNode,
+              ReplicationPeerConfigUtil.toByteArray(rpc));
+          }
+        } else {
+          LOG.info("No tableCFs in peerNode:" + peerId);
+        }
+      }
+    } catch (KeeperException e) {
+      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
+      return false;
+    } catch (InterruptedException e) {
+      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
+      return false;
+    } catch (IOException e) {
+      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
+      return false;
+    }
+    return true;
+  }
+
+  private ReplicationPeerConfig getReplicationPeerConig(String peerNode)
+          throws KeeperException, InterruptedException {
+    byte[] data = null;
+    data = ZKUtil.getData(this.zookeeper, peerNode);
+    if (data == null) {
+      LOG.error("Could not get configuration for " +
+              "peer because it doesn't exist. peer=" + peerNode);
+      return null;
+    }
+    try {
+      return ReplicationPeerConfigUtil.parsePeerFrom(data);
+    } catch (DeserializationException e) {
+      LOG.warn("Failed to parse cluster key from peer=" + peerNode);
+      return null;
+    }
+  }
+
+  private static void printUsageAndExit() {
+    System.err.printf(
+      "Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader"
+          + " [options]");
+    System.err.println(" where [options] are:");
+    System.err.println("  -h|-help      Show this help and exit.");
+    System.err.println("  copyTableCFs  Copy table-cfs to replication peer config");
+    System.err.println("  upgrade           Upgrade replication peer config to new format");
+    System.err.println();
+    System.exit(1);
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length != 1) {
+      printUsageAndExit();
+    }
+    if (args[0].equals("-help") || args[0].equals("-h")) {
+      printUsageAndExit();
+    } else if (args[0].equals("copyTableCFs")) {
+      Configuration conf = HBaseConfiguration.create();
+      ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null);
+      try {
+        ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zkw,
+            conf, null);
+        tableCFsUpdater.copyTableCFs();
+      } finally {
+        zkw.close();
+      }
+    } else if (args[0].equals("upgrade")) {
+      Configuration conf = HBaseConfiguration.create();
+      ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null);
+      ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf, null);
+      upgrader.upgrade();
+    } else {
+      printUsageAndExit();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
deleted file mode 100644
index f442495..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x.
- * It will be removed in HBase 3.x. See HBASE-11393
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TableCFsUpdater extends ReplicationStateZKBase {
-
-  private static final Log LOG = LogFactory.getLog(TableCFsUpdater.class);
-
-  public TableCFsUpdater(ZKWatcher zookeeper,
-                         Configuration conf, Abortable abortable) {
-    super(zookeeper, conf, abortable);
-  }
-
-  public void update() {
-    List<String> znodes = null;
-    try {
-      znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
-    } catch (KeeperException e) {
-      LOG.error("Failed to get peers znode", e);
-    }
-    if (znodes != null) {
-      for (String peerId : znodes) {
-        if (!update(peerId)) {
-          LOG.error("upgrade tableCFs failed for peerId=" + peerId);
-        }
-      }
-    }
-  }
-
-  public boolean update(String peerId) {
-    String tableCFsNode = getTableCFsNode(peerId);
-    try {
-      if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
-        String peerNode = getPeerNode(peerId);
-        ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode);
-        // We only need to copy data from tableCFs node to rpc Node the first time hmaster start.
-        if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) {
-          // we copy TableCFs node into PeerNode
-          LOG.info("copy tableCFs into peerNode:" + peerId);
-          ReplicationProtos.TableCF[] tableCFs =
-                  ReplicationPeerConfigUtil.parseTableCFs(
-                          ZKUtil.getData(this.zookeeper, tableCFsNode));
-          if (tableCFs != null && tableCFs.length > 0) {
-            rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
-            ZKUtil.setData(this.zookeeper, peerNode,
-              ReplicationPeerConfigUtil.toByteArray(rpc));
-          }
-        } else {
-          LOG.info("No tableCFs in peerNode:" + peerId);
-        }
-      }
-    } catch (KeeperException e) {
-      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
-      return false;
-    } catch (InterruptedException e) {
-      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
-      return false;
-    } catch (IOException e) {
-      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
-      return false;
-    }
-    return true;
-  }
-
-  private ReplicationPeerConfig getReplicationPeerConig(String peerNode)
-          throws KeeperException, InterruptedException {
-    byte[] data = null;
-    data = ZKUtil.getData(this.zookeeper, peerNode);
-    if (data == null) {
-      LOG.error("Could not get configuration for " +
-              "peer because it doesn't exist. peer=" + peerNode);
-      return null;
-    }
-    try {
-      return ReplicationPeerConfigUtil.parsePeerFrom(data);
-    } catch (DeserializationException e) {
-      LOG.warn("Failed to parse cluster key from peer=" + peerNode);
-      return null;
-    }
-  }
-
-  private static void printUsageAndExit() {
-    System.err.printf("Usage: hbase org.apache.hadoop.hbase.replication.master.TableCFsUpdater [options]");
-    System.err.println(" where [options] are:");
-    System.err.println("  -h|-help    Show this help and exit.");
-    System.err.println("  update      Copy table-cfs to replication peer config");
-    System.err.println();
-    System.exit(1);
-  }
-
-  public static void main(String[] args) throws Exception {
-    if (args.length != 1) {
-      printUsageAndExit();
-    }
-    if (args[0].equals("-help") || args[0].equals("-h")) {
-      printUsageAndExit();
-    } else if (args[0].equals("update")) {
-      Configuration conf = HBaseConfiguration.create();
-      ZKWatcher zkw = new ZKWatcher(conf, "TableCFsUpdater", null);
-      try {
-        TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zkw, conf, null);
-        tableCFsUpdater.update();
-      } finally {
-        zkw.close();
-      }
-    } else {
-      printUsageAndExit();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index e489078..6591826 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -149,6 +149,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
 
     // Add a valid peer
     admin.addReplicationPeer(ID_ONE, rpc1).join();
+    rpc1.setReplicateAllUserTables(false);
+    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
 
     Map<TableName, List<String>> tableCFs = new HashMap<>();
 
@@ -248,6 +250,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
     final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
     // Add a valid peer
     admin.addReplicationPeer(ID_ONE, rpc1).join();
+    rpc1.setReplicateAllUserTables(false);
+    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
+
     Map<TableName, List<String>> tableCFs = new HashMap<>();
     try {
       tableCFs.put(tableName3, null);
@@ -328,6 +333,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(KEY_ONE);
     admin.addReplicationPeer(ID_ONE, rpc).join();
+    rpc.setReplicateAllUserTables(false);
+    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
 
     // add ns1 and ns2 to peer config
     rpc = admin.getReplicationPeerConfig(ID_ONE).get();
@@ -364,6 +371,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(KEY_ONE);
     admin.addReplicationPeer(ID_ONE, rpc).join();
+    rpc.setReplicateAllUserTables(false);
+    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
 
     rpc = admin.getReplicationPeerConfig(ID_ONE).get();
     Set<String> namespaces = new HashSet<String>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
index 4b88bf7..9ceb172 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
@@ -218,6 +218,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
     Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
     tableCfs.put(tableName, null);
     ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get();
+    rpc.setReplicateAllUserTables(false);
     rpc.setTableCFsMap(tableCfs);
     try {
       // Only add tableName to replication peer config
@@ -236,6 +237,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
         admin2.tableExists(tableName2).get());
     } finally {
       rpc.setTableCFsMap(null);
+      rpc.setReplicateAllUserTables(true);
       admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 036706a..19f117b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -85,10 +85,9 @@ public class TestReplicationAdmin {
    */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
     TEST_UTIL.startMiniCluster();
-    Configuration conf = TEST_UTIL.getConfiguration();
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
-    admin = new ReplicationAdmin(conf);
+    admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
     hbaseAdmin = TEST_UTIL.getAdmin();
   }
 
@@ -238,8 +237,8 @@ public class TestReplicationAdmin {
 
   @Test
   public void testAppendPeerTableCFs() throws Exception {
-    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
-    rpc1.setClusterKey(KEY_ONE);
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(KEY_ONE);
     final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
     final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
     final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
@@ -248,10 +247,14 @@ public class TestReplicationAdmin {
     final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
 
     // Add a valid peer
-    admin.addPeer(ID_ONE, rpc1, null);
+    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
 
-    Map<TableName, List<String>> tableCFs = new HashMap<>();
+    // Update peer config, not replicate all user tables
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+    rpc.setReplicateAllUserTables(false);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
 
+    Map<TableName, List<String>> tableCFs = new HashMap<>();
     tableCFs.put(tableName1, null);
     admin.appendPeerTableCFs(ID_ONE, tableCFs);
     Map<TableName, List<String>> result =
@@ -338,14 +341,21 @@ public class TestReplicationAdmin {
 
   @Test
   public void testRemovePeerTableCFs() throws Exception {
-    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
-    rpc1.setClusterKey(KEY_ONE);
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(KEY_ONE);
     final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
     final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
     final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
     final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
+
     // Add a valid peer
-    admin.addPeer(ID_ONE, rpc1, null);
+    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+
+    // Update peer config, not replicate all user tables
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+    rpc.setReplicateAllUserTables(false);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+
     Map<TableName, List<String>> tableCFs = new HashMap<>();
     try {
       tableCFs.put(tableName3, null);
@@ -423,27 +433,98 @@ public class TestReplicationAdmin {
     rpc.setClusterKey(KEY_ONE);
     hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
 
-    rpc = admin.getPeerConfig(ID_ONE);
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+    rpc.setReplicateAllUserTables(false);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
     Set<String> namespaces = new HashSet<>();
     namespaces.add(ns1);
     namespaces.add(ns2);
     rpc.setNamespaces(namespaces);
-    admin.updatePeerConfig(ID_ONE, rpc);
-    namespaces = admin.getPeerConfig(ID_ONE).getNamespaces();
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
     assertEquals(2, namespaces.size());
     assertTrue(namespaces.contains(ns1));
     assertTrue(namespaces.contains(ns2));
 
-    rpc = admin.getPeerConfig(ID_ONE);
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
     namespaces.clear();
     namespaces.add(ns1);
     rpc.setNamespaces(namespaces);
-    admin.updatePeerConfig(ID_ONE, rpc);
-    namespaces = admin.getPeerConfig(ID_ONE).getNamespaces();
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
     assertEquals(1, namespaces.size());
     assertTrue(namespaces.contains(ns1));
 
-    admin.removePeer(ID_ONE);
+    hbaseAdmin.removeReplicationPeer(ID_ONE);
+  }
+
+  @Test
+  public void testSetReplicateAllUserTables() throws Exception {
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(KEY_ONE);
+    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+    assertTrue(rpc.replicateAllUserTables());
+
+    rpc.setReplicateAllUserTables(false);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+    assertFalse(rpc.replicateAllUserTables());
+
+    rpc.setReplicateAllUserTables(true);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+    assertTrue(rpc.replicateAllUserTables());
+
+    hbaseAdmin.removeReplicationPeer(ID_ONE);
+  }
+
+  @Test
+  public void testPeerConfigConflict() throws Exception {
+    // Default replicate all flag is true
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(KEY_ONE);
+
+    String ns1 = "ns1";
+    Set<String> namespaces = new HashSet<String>();
+    namespaces.add(ns1);
+
+    TableName tab1 = TableName.valueOf("ns1:tabl");
+    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
+    tableCfs.put(tab1, new ArrayList<String>());
+
+    try {
+      rpc.setNamespaces(namespaces);
+      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+      fail("Should throw Exception. When replicate all flag is true, no need to config namespaces");
+    } catch (IOException e) {
+      // OK
+      rpc.setNamespaces(null);
+    }
+
+    try {
+      rpc.setTableCFsMap(tableCfs);
+      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+      fail("Should throw Exception. When replicate all flag is true, no need to config table-cfs");
+    } catch (IOException e) {
+      // OK
+      rpc.setTableCFsMap(null);
+    }
+
+    try {
+      rpc.setNamespaces(namespaces);
+      rpc.setTableCFsMap(tableCfs);
+      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+      fail("Should throw Exception."
+          + " When replicate all flag is true, no need to config namespaces or table-cfs");
+    } catch (IOException e) {
+      // OK
+      rpc.setNamespaces(null);
+      rpc.setTableCFsMap(null);
+    }
   }
 
   @Test
@@ -455,6 +536,7 @@ public class TestReplicationAdmin {
 
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(KEY_ONE);
+    rpc.setReplicateAllUserTables(false);
     hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
 
     rpc = admin.getPeerConfig(ID_ONE);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index 2610313..3b7fd84 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -194,7 +194,13 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
       admin2.disableTable(TestReplicationBase.tableName);
       admin2.deleteTable(TestReplicationBase.tableName);
     }
-    assertFalse("Table should not exists in the peer cluster", admin2.isTableAvailable(TestReplicationBase.tableName));
+    assertFalse("Table should not exists in the peer cluster",
+      admin2.isTableAvailable(TestReplicationBase.tableName));
+
+    // update peer config
+    ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId);
+    rpc.setReplicateAllUserTables(false);
+    admin1.updateReplicationPeerConfig(peerId, rpc);
 
     Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
     tableCfs.put(tableName, null);
@@ -214,6 +220,10 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
     } finally {
       adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId));
       admin1.disableTableReplication(TestReplicationBase.tableName);
+
+      rpc = admin1.getReplicationPeerConfig(peerId);
+      rpc.setReplicateAllUserTables(true);
+      admin1.updateReplicationPeerConfig(peerId, rpc);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 58b22c8..ac53551 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -524,10 +524,12 @@ public class TestMasterReplication {
 
   private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
       throws Exception {
-    try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
-        .getAdmin()) {
-      admin.addReplicationPeer(id,
+    try (Admin admin =
+        ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) {
+      admin.addReplicationPeer(
+        id,
         new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
+            .setReplicateAllUserTables(false)
             .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)));
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
index 433a345..0d7a92d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
@@ -140,8 +140,12 @@ public class TestNamespaceReplication extends TestReplicationBase {
     Table htab1B = connection1.getTable(tabBName);
     Table htab2B = connection2.getTable(tabBName);
 
-    // add ns1 to peer config which replicate to cluster2
     ReplicationPeerConfig rpc = admin.getPeerConfig("2");
+    rpc.setReplicateAllUserTables(false);
+    admin.updatePeerConfig("2", rpc);
+
+    // add ns1 to peer config which replicate to cluster2
+    rpc = admin.getPeerConfig("2");
     Set<String> namespaces = new HashSet<>();
     namespaces.add(ns1);
     rpc.setNamespaces(namespaces);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index 84ce9a3..e9c352d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -404,6 +404,7 @@ public class TestPerTableCFReplication {
       // A. add cluster2/cluster3 as peers to cluster1
       ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
       rpc2.setClusterKey(utility2.getClusterKey());
+      rpc2.setReplicateAllUserTables(false);
       Map<TableName, List<String>> tableCFs = new HashMap<>();
       tableCFs.put(tabCName, null);
       tableCFs.put(tabBName, new ArrayList<>());
@@ -413,6 +414,7 @@ public class TestPerTableCFReplication {
 
       ReplicationPeerConfig rpc3 = new ReplicationPeerConfig();
       rpc3.setClusterKey(utility3.getClusterKey());
+      rpc3.setReplicateAllUserTables(false);
       tableCFs.clear();
       tableCFs.put(tabAName, null);
       tableCFs.put(tabBName, new ArrayList<>());
@@ -518,7 +520,7 @@ public class TestPerTableCFReplication {
       connection2.close();
       connection3.close();
     }
- }
+  }
 
   private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException {
     Get get = new Get(row);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index be65576..9fda8bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -202,19 +202,31 @@ public class TestReplicationWALEntryFilters {
   @Test
   public void testNamespaceTableCfWALEntryFilter() {
     ReplicationPeer peer = mock(ReplicationPeer.class);
+    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
 
-    // 1. no namespaces config and table-cfs config in peer
-    when(peer.getNamespaces()).thenReturn(null);
-    when(peer.getTableCFs()).thenReturn(null);
+    // 1. replicate all user tables
+    when(peerConfig.replicateAllUserTables()).thenReturn(true);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     Entry userEntry = createEntry(null, a, b, c);
-    WALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
-    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
+    ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
+
+    // 2. not replicate all user tables, no namespaces and table-cfs config
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getNamespaces()).thenReturn(null);
+    when(peerConfig.getTableCFsMap()).thenReturn(null);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(null, filter.filter(userEntry));
 
-    // 2. Only config table-cfs in peer
+    // 3. Only config table-cfs in peer
     // empty map
     userEntry = createEntry(null, a, b, c);
     Map<TableName, List<String>> tableCfs = new HashMap<>();
-    when(peer.getTableCFs()).thenReturn(tableCfs);
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));
 
@@ -222,7 +234,9 @@ public class TestReplicationWALEntryFilters {
     userEntry = createEntry(null, a, b, c);
     tableCfs = new HashMap<>();
     tableCfs.put(TableName.valueOf("bar"), null);
-    when(peer.getTableCFs()).thenReturn(tableCfs);
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));
 
@@ -230,7 +244,9 @@ public class TestReplicationWALEntryFilters {
     userEntry = createEntry(null, a, b, c);
     tableCfs = new HashMap<>();
     tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
-    when(peer.getTableCFs()).thenReturn(tableCfs);
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a), filter.filter(userEntry));
 
@@ -238,7 +254,9 @@ public class TestReplicationWALEntryFilters {
     userEntry = createEntry(null, a, b, c, d);
     tableCfs = new HashMap<>();
     tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
-    when(peer.getTableCFs()).thenReturn(tableCfs);
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a,c), filter.filter(userEntry));
 
@@ -246,14 +264,19 @@ public class TestReplicationWALEntryFilters {
     when(peer.getTableCFs()).thenReturn(null);
     // empty set
     Set<String> namespaces = new HashSet<>();
-    when(peer.getNamespaces()).thenReturn(namespaces);
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getNamespaces()).thenReturn(namespaces);
+    when(peerConfig.getTableCFsMap()).thenReturn(null);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     userEntry = createEntry(null, a, b, c);
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));
 
     // namespace default
     namespaces.add("default");
-    when(peer.getNamespaces()).thenReturn(namespaces);
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getNamespaces()).thenReturn(namespaces);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     userEntry = createEntry(null, a, b, c);
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
@@ -261,7 +284,9 @@ public class TestReplicationWALEntryFilters {
     // namespace ns1
     namespaces = new HashSet<>();
     namespaces.add("ns1");
-    when(peer.getNamespaces()).thenReturn(namespaces);
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getNamespaces()).thenReturn(namespaces);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     userEntry = createEntry(null, a, b, c);
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));
@@ -271,9 +296,11 @@ public class TestReplicationWALEntryFilters {
     namespaces = new HashSet<>();
     tableCfs = new HashMap<>();
     namespaces.add("ns1");
-    when(peer.getNamespaces()).thenReturn(namespaces);
     tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
-    when(peer.getTableCFs()).thenReturn(tableCfs);
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getNamespaces()).thenReturn(namespaces);
+    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     userEntry = createEntry(null, a, b, c);
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a, c), filter.filter(userEntry));
@@ -281,9 +308,11 @@ public class TestReplicationWALEntryFilters {
     namespaces = new HashSet<>();
     tableCfs = new HashMap<>();
     namespaces.add("default");
-    when(peer.getNamespaces()).thenReturn(namespaces);
     tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
-    when(peer.getTableCFs()).thenReturn(tableCfs);
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getNamespaces()).thenReturn(namespaces);
+    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     userEntry = createEntry(null, a, b, c);
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@@ -291,9 +320,11 @@ public class TestReplicationWALEntryFilters {
     namespaces = new HashSet<>();
     tableCfs = new HashMap<>();
     namespaces.add("ns1");
-    when(peer.getNamespaces()).thenReturn(namespaces);
     tableCfs.put(TableName.valueOf("bar"), null);
-    when(peer.getTableCFs()).thenReturn(tableCfs);
+    when(peerConfig.replicateAllUserTables()).thenReturn(false);
+    when(peerConfig.getNamespaces()).thenReturn(namespaces);
+    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
     userEntry = createEntry(null, a, b, c);
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
index cb895ca..e78abfb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
@@ -50,7 +50,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @Category({ReplicationTests.class, SmallTests.class})
-public class TestTableCFsUpdater extends TableCFsUpdater {
+public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
 
   private static final Log LOG = LogFactory.getLog(TestTableCFsUpdater.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -164,7 +164,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
     assertNull(actualRpc.getTableCFsMap());
     assertNull(actualTableCfs);
 
-    update();
+    copyTableCFs();
 
     peerId = "1";
     peerNode = getPeerNode(peerId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 3f64356..50c086a 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -92,6 +92,7 @@ module Hbase
           namespaces.each do |n|
             ns_set.add(n)
           end
+          replication_peer_config.setReplicateAllUserTables(false)
           replication_peer_config.set_namespaces(ns_set)
         end
 
@@ -101,6 +102,7 @@ module Hbase
           table_cfs.each do |key, val|
             map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
           end
+          replication_peer_config.setReplicateAllUserTables(false)
           replication_peer_config.set_table_cfs_map(map)
         end
 
@@ -265,6 +267,13 @@ module Hbase
       end
     end
 
+    def set_peer_replicate_all(id, replicate_all)
+      rpc = @replication_admin.getPeerConfig(id)
+      return if rpc.nil?
+      rpc.setReplicateAllUserTables(replicate_all)
+      @replication_admin.updatePeerConfig(id, rpc)
+    end
+
     #----------------------------------------------------------------------------------------------
     # Enables a table's replication switch
     def enable_tablerep(table_name)

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index 687af12..60ca229 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -377,6 +377,7 @@ Shell.load_command_group(
     list_peers
     enable_peer
     disable_peer
+    set_peer_replicate_all
     set_peer_namespaces
     append_peer_namespaces
     remove_peer_namespaces

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index 04453c2..6812df4 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -33,7 +33,7 @@ EOF
         peers = replication_admin.list_peers
 
         formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME
-                            STATE NAMESPACES TABLE_CFS BANDWIDTH])
+                            STATE REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH])
 
         peers.each do |peer|
           id = peer.getPeerId
@@ -42,7 +42,8 @@ EOF
           namespaces = replication_admin.show_peer_namespaces(config)
           tableCFs = replication_admin.show_peer_tableCFs(id)
           formatter.row([id, config.getClusterKey,
-                         config.getReplicationEndpointImpl, state, namespaces, tableCFs,
+                         config.getReplicationEndpointImpl, state,
+                         config.replicateAllUserTables, namespaces, tableCFs,
                          config.getBandwidth])
         end
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb
new file mode 100644
index 0000000..f6de615
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb
@@ -0,0 +1,54 @@
+#
+# Copyright The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class SetPeerReplicateAll < Command
+      def help
+        <<-EOF
+  Set the replicate_all flag to true or false for the specified peer.
+
+  If replicate_all flag is true, then all user tables (REPLICATION_SCOPE != 0)
+  will be replicate to peer cluster.
+
+  If replicate_all flag is false, then all user tables cannot be replicate to
+  peer cluster. Then you can use 'set_peer_namespaces' or 'append_peer_namespaces'
+  to set which namespaces will be replicated to peer cluster. And you can use
+  'set_peer_tableCFs' or 'append_peer_tableCFs' to set which tables will be
+  replicated to peer cluster.
+
+  Notice: When you want to change a peer's replicate_all flag from false to true,
+          you need clean the peer's NAMESPACES and TABLECFS config firstly.
+
+  Examples:
+
+    # set replicate_all flag to true
+    hbase> set_peer_replicate_all '1', true
+    # set replicate_all flag to false
+    hbase> set_peer_replicate_all '1', false
+EOF
+      end
+
+      def command(id, replicate_all)
+        replication_admin.set_peer_replicate_all(id, replicate_all)
+      end
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca6e7e68/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 75f3c04..4b74ada 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -73,8 +73,10 @@ module Hbase
       command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key})
 
       assert_equal(1, command(:list_peers).length)
-      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
-      assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+      peer = command(:list_peers).get(0)
+      assert_equal(@peer_id, peer.getPeerId)
+      assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
+      assert_equal(true, peer.getPeerConfig.replicateAllUserTables)
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -86,8 +88,10 @@ module Hbase
       command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key})
 
       assert_equal(1, command(:list_peers).length)
-      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
-      assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+      peer = command(:list_peers).get(0)
+      assert_equal(@peer_id, peer.getPeerId)
+      assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
+      assert_equal(true, peer.getPeerConfig.replicateAllUserTables)
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -131,8 +135,10 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
-      assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+      peer = command(:list_peers).get(0)
+      assert_equal(@peer_id, peer.getPeerId)
+      assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
+      assert_equal(true, peer.getPeerConfig.replicateAllUserTables)
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -147,11 +153,13 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
-      peer_config = command(:list_peers).get(0).getPeerConfig
+      peer = command(:list_peers).get(0)
+      assert_equal(@peer_id, peer.getPeerId)
+      peer_config = peer.getPeerConfig
+      assert_equal(false, peer_config.replicateAllUserTables)
       assert_equal(cluster_key, peer_config.get_cluster_key)
       assert_equal(namespaces_str,
-        replication_admin.show_peer_namespaces(peer_config))
+                   replication_admin.show_peer_namespaces(peer_config))
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -169,8 +177,10 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
-      peer_config = command(:list_peers).get(0).getPeerConfig
+      peer = command(:list_peers).get(0)
+      assert_equal(@peer_id, peer.getPeerId)
+      peer_config = peer.getPeerConfig
+      assert_equal(false, peer_config.replicateAllUserTables)
       assert_equal(cluster_key, peer_config.get_cluster_key)
       assert_equal(namespaces_str,
         replication_admin.show_peer_namespaces(peer_config))
@@ -203,9 +213,11 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
-      assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
-      assert_tablecfs_equal(table_cfs, command(:get_peer_config, @peer_id).getTableCFsMap())
+      peer = command(:list_peers).get(0)
+      assert_equal(@peer_id, peer.getPeerId)
+      assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
+      assert_tablecfs_equal(table_cfs, peer.getPeerConfig.getTableCFsMap)
+      assert_equal(false, peer.getPeerConfig.replicateAllUserTables)
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -225,10 +237,12 @@ module Hbase
       cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
       args = { CLUSTER_KEY => cluster_key}
       command(:add_peer, @peer_id, args)
+      command(:set_peer_replicate_all, @peer_id, false)
 
       assert_equal(1, command(:list_peers).length)
-      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
-      assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+      peer = command(:list_peers).get(0)
+      assert_equal(@peer_id, peer.getPeerId)
+      assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
 
       table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
       command(:set_peer_tableCFs, @peer_id, table_cfs)
@@ -242,10 +256,12 @@ module Hbase
       cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
       args = { CLUSTER_KEY => cluster_key}
       command(:add_peer, @peer_id, args)
+      command(:set_peer_replicate_all, @peer_id, false)
 
       assert_equal(1, command(:list_peers).length)
-      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
-      assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+      peer = command(:list_peers).get(0)
+      assert_equal(@peer_id, peer.getPeerId)
+      assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
 
       table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
       command(:append_peer_tableCFs, @peer_id, table_cfs)
@@ -266,8 +282,9 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
-      assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+      peer = command(:list_peers).get(0)
+      assert_equal(@peer_id, peer.getPeerId)
+      assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
 
       table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
       command(:remove_peer_tableCFs, @peer_id, { "ns3:table3" => ["cf1", "cf2"] })
@@ -284,6 +301,7 @@ module Hbase
 
       args = { CLUSTER_KEY => cluster_key }
       command(:add_peer, @peer_id, args)
+      command(:set_peer_replicate_all, @peer_id, false)
 
       command(:set_peer_namespaces, @peer_id, namespaces)
 
@@ -291,7 +309,7 @@ module Hbase
       assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
       peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
-        replication_admin.show_peer_namespaces(peer_config))
+                   replication_admin.show_peer_namespaces(peer_config))
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -304,6 +322,7 @@ module Hbase
 
       args = { CLUSTER_KEY => cluster_key }
       command(:add_peer, @peer_id, args)
+      command(:set_peer_replicate_all, @peer_id, false)
 
       command(:append_peer_namespaces, @peer_id, namespaces)
 
@@ -311,7 +330,7 @@ module Hbase
       assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
       peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
-        replication_admin.show_peer_namespaces(peer_config))
+                   replication_admin.show_peer_namespaces(peer_config))
 
       namespaces = ["ns3"]
       namespaces_str = "ns1;ns2;ns3"
@@ -321,7 +340,7 @@ module Hbase
       assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
       peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
-        replication_admin.show_peer_namespaces(peer_config))
+                   replication_admin.show_peer_namespaces(peer_config))
 
       # append a namespace which is already in the peer config
       command(:append_peer_namespaces, @peer_id, namespaces)
@@ -330,7 +349,7 @@ module Hbase
       assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
       peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
-        replication_admin.show_peer_namespaces(peer_config))
+                   replication_admin.show_peer_namespaces(peer_config))
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -351,7 +370,7 @@ module Hbase
       assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
       peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
-        replication_admin.show_peer_namespaces(peer_config))
+                   replication_admin.show_peer_namespaces(peer_config))
 
       namespaces = ["ns3"]
       namespaces_str = nil
@@ -361,7 +380,7 @@ module Hbase
       assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
       peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
-        replication_admin.show_peer_namespaces(peer_config))
+                   replication_admin.show_peer_namespaces(peer_config))
 
       # remove a namespace which is not in peer config
       command(:remove_peer_namespaces, @peer_id, namespaces)
@@ -370,12 +389,34 @@ module Hbase
       assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
       peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
-        replication_admin.show_peer_namespaces(peer_config))
+                   replication_admin.show_peer_namespaces(peer_config))
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
     end
 
+    define_test 'set_peer_replicate_all' do
+      cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
+
+      args = { CLUSTER_KEY => cluster_key }
+      command(:add_peer, @peer_id, args)
+
+      assert_equal(1, command(:list_peers).length)
+      peer_config = command(:list_peers).get(0).getPeerConfig
+      assert_equal(true, peer_config.replicateAllUserTables)
+
+      command(:set_peer_replicate_all, @peer_id, false)
+      peer_config = command(:list_peers).get(0).getPeerConfig
+      assert_equal(false, peer_config.replicateAllUserTables)
+
+      command(:set_peer_replicate_all, @peer_id, true)
+      peer_config = command(:list_peers).get(0).getPeerConfig
+      assert_equal(true, peer_config.replicateAllUserTables)
+
+      # cleanup for future tests
+      replication_admin.remove_peer(@peer_id)
+    end
+
     define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
       cluster_key = "localhost:2181:/hbase-test"
       args = { CLUSTER_KEY => cluster_key }


Mime
View raw message