hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject hbase git commit: HBASE-19492 Add EXCLUDE_NAMESPACE and EXCLUDE_TABLECFS support to replication peer config
Date Tue, 19 Dec 2017 08:56:19 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 7a7e55b60 -> 03e79b799


HBASE-19492 Add EXCLUDE_NAMESPACE and EXCLUDE_TABLECFS support to replication peer config


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/03e79b79
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/03e79b79
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/03e79b79

Branch: refs/heads/master
Commit: 03e79b79949a63f7320b4c51fecca40f93659bd9
Parents: 7a7e55b
Author: Guanghao Zhang <zghao@apache.org>
Authored: Tue Dec 12 11:19:57 2017 +0800
Committer: Guanghao Zhang <zghao@apache.org>
Committed: Tue Dec 19 16:53:43 2017 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerConfigUtil.java  |  35 ++-
 .../replication/ReplicationPeerConfig.java      |  65 ++++--
 .../src/main/protobuf/Replication.proto         |   2 +
 .../replication/ReplicationPeersZKImpl.java     |   2 +
 .../master/replication/ReplicationManager.java  |  51 +++--
 .../NamespaceTableCfWALEntryFilter.java         | 101 ++++++---
 .../replication/TestReplicationAdmin.java       | 219 ++++++++++++++++---
 .../TestReplicationWALEntryFilters.java         | 131 +++++++++--
 .../src/main/ruby/hbase/replication_admin.rb    |  49 ++++-
 hbase-shell/src/main/ruby/shell.rb              |   2 +
 .../src/main/ruby/shell/commands/list_peers.rb  |  17 +-
 .../commands/set_peer_exclude_namespaces.rb     |  52 +++++
 .../shell/commands/set_peer_exclude_tableCFs.rb |  51 +++++
 .../ruby/shell/commands/set_peer_namespaces.rb  |   8 +-
 .../shell/commands/set_peer_replicate_all.rb    |   8 +-
 .../ruby/shell/commands/set_peer_tableCFs.rb    |   6 +-
 .../test/ruby/hbase/replication_admin_test.rb   |  42 ++++
 17 files changed, 715 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index 52a3c93..d8c86f0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -48,6 +48,7 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.ArrayList;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Helper for TableCFs Operations.
@@ -289,11 +290,8 @@ public final class ReplicationPeerConfigUtil {
 
     List<ByteString> namespacesList = peer.getNamespacesList();
     if (namespacesList != null && namespacesList.size() != 0) {
-      Set<String> namespaces = new HashSet<>();
-      for (ByteString namespace : namespacesList) {
-        namespaces.add(namespace.toStringUtf8());
-      }
-      peerConfig.setNamespaces(namespaces);
+      peerConfig.setNamespaces(
+        namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
     }
 
     if (peer.hasBandwidth()) {
@@ -304,6 +302,19 @@ public final class ReplicationPeerConfigUtil {
       peerConfig.setReplicateAllUserTables(peer.getReplicateAll());
     }
 
+    Map<TableName, ? extends Collection<String>> excludeTableCFsMap =
+        convert2Map(peer.getExcludeTableCfsList()
+            .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
+    if (excludeTableCFsMap != null) {
+      peerConfig.setExcludeTableCFsMap(excludeTableCFsMap);
+    }
+
+    List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
+    if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
+      peerConfig.setExcludeNamespaces(
+        excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
+    }
+
     return peerConfig;
   }
 
@@ -346,6 +357,20 @@ public final class ReplicationPeerConfigUtil {
 
     builder.setBandwidth(peerConfig.getBandwidth());
     builder.setReplicateAll(peerConfig.replicateAllUserTables());
+
+    ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
+    if (excludeTableCFs != null) {
+      for (int i = 0; i < excludeTableCFs.length; i++) {
+        builder.addExcludeTableCfs(excludeTableCFs[i]);
+      }
+    }
+    Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
+    if (excludeNamespaces != null) {
+      for (String namespace : excludeNamespaces) {
+        builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
+      }
+    }
+
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 9e20829..52a5fe9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -44,6 +44,8 @@ public class ReplicationPeerConfig {
   private long bandwidth = 0;
   // Default value is true, means replicate all user tables to peer cluster.
   private boolean replicateAllUserTables = true;
+  private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null;
+  private Set<String> excludeNamespaces = null;
 
   public ReplicationPeerConfig() {
     this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -121,16 +123,44 @@ public class ReplicationPeerConfig {
     return this;
   }
 
+  public Map<TableName, List<String>> getExcludeTableCFsMap() {
+    return (Map<TableName, List<String>>) excludeTableCFsMap;
+  }
+
+  public ReplicationPeerConfig setExcludeTableCFsMap(Map<TableName,
+                                              ? extends Collection<String>> tableCFsMap) {
+    this.excludeTableCFsMap = tableCFsMap;
+    return this;
+  }
+
+  public Set<String> getExcludeNamespaces() {
+    return this.excludeNamespaces;
+  }
+
+  public ReplicationPeerConfig setExcludeNamespaces(Set<String> namespaces) {
+    this.excludeNamespaces = namespaces;
+    return this;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
     builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
     builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(",");
-    if (namespaces != null) {
-      builder.append("namespaces=").append(namespaces.toString()).append(",");
-    }
-    if (tableCFsMap != null) {
-      builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
+    if (replicateAllUserTables) {
+      if (excludeNamespaces != null) {
+        builder.append("excludeNamespaces=").append(excludeNamespaces.toString()).append(",");
+      }
+      if (excludeTableCFsMap != null) {
+        builder.append("excludeTableCFsMap=").append(excludeTableCFsMap.toString()).append(",");
+      }
+    } else {
+      if (namespaces != null) {
+        builder.append("namespaces=").append(namespaces.toString()).append(",");
+      }
+      if (tableCFsMap != null) {
+        builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
+      }
     }
     builder.append("bandwidth=").append(bandwidth);
     return builder.toString();
@@ -142,17 +172,22 @@ public class ReplicationPeerConfig {
    * @return true if the table need replicate to the peer cluster
    */
   public boolean needToReplicate(TableName table) {
-    // If null means user has explicitly not configured any namespaces and table CFs
-    // so all the tables data are applicable for replication
-    if (namespaces == null && tableCFsMap == null) {
-      return true;
-    }
-    if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
-      return true;
-    }
-    if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
+    if (replicateAllUserTables) {
+      if (excludeNamespaces != null && excludeNamespaces.contains(table.getNamespaceAsString())) {
+        return false;
+      }
+      if (excludeTableCFsMap != null && excludeTableCFsMap.containsKey(table)) {
+        return false;
+      }
       return true;
+    } else {
+      if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
+        return true;
+      }
+      if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
+        return true;
+      }
+      return false;
     }
-    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-protocol-shaded/src/main/protobuf/Replication.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
index a1a7ade..8657c25 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
@@ -46,6 +46,8 @@ message ReplicationPeer {
   repeated bytes namespaces = 6;
   optional int64 bandwidth = 7;
   optional bool replicate_all = 8;
+  repeated TableCF exclude_table_cfs = 9;
+  repeated bytes exclude_namespaces = 10;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 2c3bbd5..ca99f65 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -369,6 +369,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     existingConfig.setNamespaces(newConfig.getNamespaces());
     existingConfig.setBandwidth(newConfig.getBandwidth());
     existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables());
+    existingConfig.setExcludeNamespaces(newConfig.getExcludeNamespaces());
+    existingConfig.setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap());
 
     try {
       ZKUtil.setData(this.zookeeper, getPeerNode(id),

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
index f2a6c85..749448d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
@@ -118,15 +118,32 @@ public class ReplicationManager {
     return peers;
   }
 
-  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws ReplicationException,
-      IOException {
+  /**
+   * If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
+   * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to
+   * peer cluster.
+   *
+   * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
+   * Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
+   */
+  private void checkPeerConfig(ReplicationPeerConfig peerConfig)
+      throws ReplicationException, IOException {
     if (peerConfig.replicateAllUserTables()) {
       if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
           || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
-        throw new ReplicationException(
-          "Need clean namespaces or table-cfs config fisrtly when you want replicate all cluster");
+        throw new ReplicationException("Need clean namespaces or table-cfs config firstly"
+            + " when replicate_all flag is true");
       }
+      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
+        peerConfig.getExcludeTableCFsMap());
     } else {
+      if ((peerConfig.getExcludeNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
+          || (peerConfig.getExcludeTableCFsMap() != null
+              && !peerConfig.getTableCFsMap().isEmpty())) {
+        throw new ReplicationException(
+            "Need clean exclude-namespaces or exclude-table-cfs config firstly"
+                + " when replicate_all flag is false");
+      }
       checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
         peerConfig.getTableCFsMap());
     }
@@ -134,17 +151,19 @@ public class ReplicationManager {
   }
 
   /**
-   * Set a namespace in the peer config means that all tables in this namespace
-   * will be replicated to the peer cluster.
+   * Set a namespace in the peer config means that all tables in this namespace will be replicated
+   * to the peer cluster.
+   * 1. If peer config already has a namespace, then not allow set any table of this namespace
+   *    to the peer config.
+   * 2. If peer config already has a table, then not allow set this table's namespace to the peer
+   *    config.
    *
-   * 1. If you already have set a namespace in the peer config, then you can't set any table
-   *    of this namespace to the peer config.
-   * 2. If you already have set a table in the peer config, then you can't set this table's
-   *    namespace to the peer config.
-   *
-   * @param namespaces
-   * @param tableCfs
-   * @throws ReplicationException
+   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
+   * replicated to the peer cluster.
+   * 1. If peer config already has a exclude namespace, then not allow set any exclude table of
+   *    this namespace to the peer config.
+   * 2. If peer config already has a exclude table, then not allow set this table's namespace
+   *    as a exclude namespace.
    */
   private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
@@ -157,8 +176,8 @@ public class ReplicationManager {
     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
       TableName table = entry.getKey();
       if (namespaces.contains(table.getNamespaceAsString())) {
-        throw new ReplicationException(
-            "Table-cfs config conflict with namespaces config in peer");
+        throw new ReplicationException("Table-cfs " + table + " is conflict with namespaces "
+            + table.getNamespaceAsString() + " in peer config");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index 9a4cc6c..5068cce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -32,16 +32,17 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate;
-
 /**
- * Filter a WAL Entry by namespaces and table-cfs config in the peer. It first filter entry
- * by namespaces config, then filter entry by table-cfs config.
+ * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
+ * exclude namespaces config, and exclude table-cfs config.
+ *
+ * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But
+ * you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster.
+ * Note: set a exclude namespace means that all tables in this namespace can't be replicated.
  *
- * 1. Set a namespace in peer config means that all tables in this namespace will be replicated.
- * 2. If the namespaces config is null, then the table-cfs config decide which table's edit
- *    can be replicated. If the table-cfs config is null, then the namespaces config decide
- *    which table's edit can be replicated.
+ * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
+ * But you can set namespaces or table-cfs which will be replicated to peer cluster.
+ * Note: set a namespace means that all tables in this namespace will be replicated.
  */
 @InterfaceAudience.Private
 public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
@@ -61,7 +62,15 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
     ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
 
     if (peerConfig.replicateAllUserTables()) {
-      // replicate all user tables, so return entry directly
+      // replicate all user tables, but filter by exclude namespaces config
+      Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
+
+      // return null(prevent replicating) if logKey's table is in this peer's
+      // exclude namespaces list
+      if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
+        return null;
+      }
+
       return entry;
     } else {
       // Not replicate all user tables, so filter by namespaces and table-cfs config
@@ -80,7 +89,7 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
 
       // Then filter by table-cfs config
       // return null(prevent replicating) if logKey's table isn't in this peer's
-      // replicaable namespace list and table list
+      // replicable tables list
       if (tableCFs == null || !tableCFs.containsKey(tabName)) {
         return null;
       }
@@ -93,34 +102,39 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
   public Cell filterCell(final Entry entry, Cell cell) {
     ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
     if (peerConfig.replicateAllUserTables()) {
-      // replicate all user tables, so return cell directly
+      // replicate all user tables, but filter by exclude table-cfs config
+      final Map<TableName, List<String>> excludeTableCfs = peerConfig.getExcludeTableCFsMap();
+      if (excludeTableCfs == null) {
+        return cell;
+      }
+
+      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+        cell = bulkLoadFilter.filterCell(cell,
+          fam -> filterByExcludeTableCfs(entry.getKey().getTablename(), Bytes.toString(fam),
+            excludeTableCfs));
+      } else {
+        if (filterByExcludeTableCfs(entry.getKey().getTablename(),
+          Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
+          excludeTableCfs)) {
+          return null;
+        }
+      }
+
       return cell;
     } else {
+      // not replicate all user tables, so filter by table-cfs config
       final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
       if (tableCfs == null) {
         return cell;
       }
-      TableName tabName = entry.getKey().getTablename();
-      List<String> cfs = tableCfs.get(tabName);
-      // ignore(remove) kv if its cf isn't in the replicable cf list
-      // (empty cfs means all cfs of this table are replicable)
+
       if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
-        cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
-          @Override
-          public boolean apply(byte[] fam) {
-            if (tableCfs != null) {
-              List<String> cfs = tableCfs.get(entry.getKey().getTablename());
-              if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
-                return true;
-              }
-            }
-            return false;
-          }
-        });
+        cell = bulkLoadFilter.filterCell(cell,
+          fam -> filterByTableCfs(entry.getKey().getTablename(), Bytes.toString(fam), tableCfs));
       } else {
-        if ((cfs != null)
-            && !cfs.contains(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(),
-              cell.getFamilyLength()))) {
+        if (filterByTableCfs(entry.getKey().getTablename(),
+          Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
+          tableCfs)) {
           return null;
         }
       }
@@ -128,4 +142,31 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
       return cell;
     }
   }
+
+  private boolean filterByExcludeTableCfs(TableName tableName, String family,
+      Map<TableName, List<String>> excludeTableCfs) {
+    List<String> excludeCfs = excludeTableCfs.get(tableName);
+    if (excludeCfs != null) {
+      // empty cfs means all cfs of this table are excluded
+      if (excludeCfs.isEmpty()) {
+        return true;
+      }
+      // ignore(remove) kv if its cf is in the exclude cfs list
+      if (excludeCfs.contains(family)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean filterByTableCfs(TableName tableName, String family,
+      Map<TableName, List<String>> tableCfs) {
+    List<String> cfs = tableCfs.get(tableName);
+    // ignore(remove) kv if its cf isn't in the replicable cf list
+    // (empty cfs means all cfs of this table are replicable)
+    if (cfs != null && !cfs.contains(family)) {
+      return true;
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 19f117b..83a2e12 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -483,8 +483,97 @@ public class TestReplicationAdmin {
   }
 
   @Test
+  public void testPeerExcludeNamespaces() throws Exception {
+    String ns1 = "ns1";
+    String ns2 = "ns2";
+
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(KEY_ONE);
+    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+    assertTrue(rpc.replicateAllUserTables());
+
+    Set<String> namespaces = new HashSet<String>();
+    namespaces.add(ns1);
+    namespaces.add(ns2);
+    rpc.setExcludeNamespaces(namespaces);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
+    assertEquals(2, namespaces.size());
+    assertTrue(namespaces.contains(ns1));
+    assertTrue(namespaces.contains(ns2));
+
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+    namespaces.clear();
+    namespaces.add(ns1);
+    rpc.setExcludeNamespaces(namespaces);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
+    assertEquals(1, namespaces.size());
+    assertTrue(namespaces.contains(ns1));
+
+    hbaseAdmin.removeReplicationPeer(ID_ONE);
+  }
+
+  @Test
+  public void testPeerExcludeTableCFs() throws Exception {
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(KEY_ONE);
+    TableName tab1 = TableName.valueOf("t1");
+    TableName tab2 = TableName.valueOf("t2");
+    TableName tab3 = TableName.valueOf("t3");
+    TableName tab4 = TableName.valueOf("t4");
+
+    // Add a valid peer
+    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+    assertTrue(rpc.replicateAllUserTables());
+
+    Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
+    tableCFs.put(tab1, null);
+    rpc.setExcludeTableCFsMap(tableCFs);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    Map<TableName, List<String>> result =
+        hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
+    assertEquals(1, result.size());
+    assertEquals(true, result.containsKey(tab1));
+    assertNull(result.get(tab1));
+
+    tableCFs.put(tab2, new ArrayList<String>());
+    tableCFs.get(tab2).add("f1");
+    rpc.setExcludeTableCFsMap(tableCFs);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
+    assertEquals(2, result.size());
+    assertTrue("Should contain t1", result.containsKey(tab1));
+    assertTrue("Should contain t2", result.containsKey(tab2));
+    assertNull(result.get(tab1));
+    assertEquals(1, result.get(tab2).size());
+    assertEquals("f1", result.get(tab2).get(0));
+
+    tableCFs.clear();
+    tableCFs.put(tab3, new ArrayList<String>());
+    tableCFs.put(tab4, new ArrayList<String>());
+    tableCFs.get(tab4).add("f1");
+    tableCFs.get(tab4).add("f2");
+    rpc.setExcludeTableCFsMap(tableCFs);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
+    assertEquals(2, result.size());
+    assertTrue("Should contain t3", result.containsKey(tab3));
+    assertTrue("Should contain t4", result.containsKey(tab4));
+    assertNull(result.get(tab3));
+    assertEquals(2, result.get(tab4).size());
+    assertEquals("f1", result.get(tab4).get(0));
+    assertEquals("f2", result.get(tab4).get(1));
+
+    hbaseAdmin.removeReplicationPeer(ID_ONE);
+  }
+
+  @Test
   public void testPeerConfigConflict() throws Exception {
-    // Default replicate all flag is true
+    // Default replicate_all flag is true
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(KEY_ONE);
 
@@ -492,14 +581,15 @@ public class TestReplicationAdmin {
     Set<String> namespaces = new HashSet<String>();
     namespaces.add(ns1);
 
-    TableName tab1 = TableName.valueOf("ns1:tabl");
+    TableName tab1 = TableName.valueOf("ns2:tabl");
     Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
     tableCfs.put(tab1, new ArrayList<String>());
 
     try {
       rpc.setNamespaces(namespaces);
       hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
-      fail("Should throw Exception. When replicate all flag is true, no need to config namespaces");
+      fail("Should throw Exception."
+          + " When replicate all flag is true, no need to config namespaces");
     } catch (IOException e) {
       // OK
       rpc.setNamespaces(null);
@@ -508,23 +598,51 @@ public class TestReplicationAdmin {
     try {
       rpc.setTableCFsMap(tableCfs);
       hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
-      fail("Should throw Exception. When replicate all flag is true, no need to config table-cfs");
+      fail("Should throw Exception."
+          + " When replicate all flag is true, no need to config table-cfs");
     } catch (IOException e) {
       // OK
       rpc.setTableCFsMap(null);
     }
 
+    // Set replicate_all flag to true
+    rpc.setReplicateAllUserTables(false);
     try {
-      rpc.setNamespaces(namespaces);
-      rpc.setTableCFsMap(tableCfs);
+      rpc.setExcludeNamespaces(namespaces);
       hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
       fail("Should throw Exception."
-          + " When replicate all flag is true, no need to config namespaces or table-cfs");
+          + " When replicate all flag is false, no need to config exclude namespaces");
     } catch (IOException e) {
       // OK
-      rpc.setNamespaces(null);
-      rpc.setTableCFsMap(null);
+      rpc.setExcludeNamespaces(null);
+    }
+
+    try {
+      rpc.setExcludeTableCFsMap(tableCfs);
+      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+      fail("Should throw Exception."
+          + " When replicate all flag is false, no need to config exclude table-cfs");
+    } catch (IOException e) {
+      // OK
+      rpc.setExcludeTableCFsMap(null);
     }
+
+    rpc.setNamespaces(namespaces);
+    rpc.setTableCFsMap(tableCfs);
+    // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config
+    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+
+    // Default replicate_all flag is true
+    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
+    rpc2.setClusterKey(KEY_SECOND);
+    rpc2.setExcludeNamespaces(namespaces);
+    rpc2.setExcludeTableCFsMap(tableCfs);
+    // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude
+    // table-cfs config
+    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
+
+    hbaseAdmin.removeReplicationPeer(ID_ONE);
+    hbaseAdmin.removeReplicationPeer(ID_SECOND);
   }
 
   @Test
@@ -539,41 +657,80 @@ public class TestReplicationAdmin {
     rpc.setReplicateAllUserTables(false);
     hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
 
-    rpc = admin.getPeerConfig(ID_ONE);
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
     Set<String> namespaces = new HashSet<String>();
     namespaces.add(ns1);
     rpc.setNamespaces(namespaces);
-    admin.updatePeerConfig(ID_ONE, rpc);
-    rpc = admin.getPeerConfig(ID_ONE);
-    Map<TableName, List<String>> tableCfs = new HashMap<>();
-    tableCfs.put(tableName1, new ArrayList<>());
-    rpc.setTableCFsMap(tableCfs);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
     try {
-      admin.updatePeerConfig(ID_ONE, rpc);
-      fail("Should throw ReplicationException, because table " + tableName1 + " conflict with namespace "
-          + ns1);
-    } catch (IOException e) {
+      Map<TableName, List<String>> tableCfs = new HashMap<>();
+      tableCfs.put(tableName1, new ArrayList<>());
+      rpc.setTableCFsMap(tableCfs);
+      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+      fail("Should throw ReplicationException" + " Because table " + tableName1
+          + " conflict with namespace " + ns1);
+    } catch (Exception e) {
       // OK
     }
 
-    rpc = admin.getPeerConfig(ID_ONE);
-    tableCfs.clear();
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+    Map<TableName, List<String>> tableCfs = new HashMap<>();
     tableCfs.put(tableName2, new ArrayList<>());
     rpc.setTableCFsMap(tableCfs);
-    admin.updatePeerConfig(ID_ONE, rpc);
-    rpc = admin.getPeerConfig(ID_ONE);
-    namespaces.clear();
-    namespaces.add(ns2);
-    rpc.setNamespaces(namespaces);
+    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
     try {
-      admin.updatePeerConfig(ID_ONE, rpc);
-      fail("Should throw ReplicationException, because namespace " + ns2 + " conflict with table "
-          + tableName2);
-    } catch (IOException e) {
+      namespaces.clear();
+      namespaces.add(ns2);
+      rpc.setNamespaces(namespaces);
+      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+      fail("Should throw ReplicationException" + " Because namespace " + ns2
+          + " conflict with table " + tableName2);
+    } catch (Exception e) {
       // OK
     }
 
-    admin.removePeer(ID_ONE);
+    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
+    rpc2.setClusterKey(KEY_SECOND);
+    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
+
+    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
+    Set<String> excludeNamespaces = new HashSet<String>();
+    excludeNamespaces.add(ns1);
+    rpc2.setExcludeNamespaces(excludeNamespaces);
+    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
+    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
+    try {
+      Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
+      excludeTableCfs.put(tableName1, new ArrayList<>());
+      rpc2.setExcludeTableCFsMap(excludeTableCfs);
+      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
+      fail("Should throw ReplicationException" + " Because exclude table " + tableName1
+          + " conflict with exclude namespace " + ns1);
+    } catch (Exception e) {
+      // OK
+    }
+
+    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
+    Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
+    excludeTableCfs.put(tableName2, new ArrayList<>());
+    rpc2.setExcludeTableCFsMap(excludeTableCfs);
+    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
+    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
+    try {
+      namespaces.clear();
+      namespaces.add(ns2);
+      rpc2.setNamespaces(namespaces);
+      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
+      fail("Should throw ReplicationException" + " Because exclude namespace " + ns2
+          + " conflict with exclude table " + tableName2);
+    } catch (Exception e) {
+      // OK
+    }
+
+    hbaseAdmin.removeReplicationPeer(ID_ONE);
+    hbaseAdmin.removeReplicationPeer(ID_SECOND);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 608d22b..7c3773a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -22,8 +22,7 @@ import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -47,6 +46,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
 @Category({ReplicationTests.class, SmallTests.class})
 public class TestReplicationWALEntryFilters {
 
@@ -205,23 +206,17 @@ public class TestReplicationWALEntryFilters {
     ReplicationPeer peer = mock(ReplicationPeer.class);
     ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
 
-    // 1. replicate all user tables
-    when(peerConfig.replicateAllUserTables()).thenReturn(true);
-    when(peer.getPeerConfig()).thenReturn(peerConfig);
-    Entry userEntry = createEntry(null, a, b, c);
-    ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
-    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
-
-    // 2. not replicate all user tables, no namespaces and table-cfs config
+    // 1. replicate_all flag is false, no namespaces and table-cfs config
     when(peerConfig.replicateAllUserTables()).thenReturn(false);
     when(peerConfig.getNamespaces()).thenReturn(null);
     when(peerConfig.getTableCFsMap()).thenReturn(null);
     when(peer.getPeerConfig()).thenReturn(peerConfig);
-    userEntry = createEntry(null, a, b, c);
-    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    Entry userEntry = createEntry(null, a, b, c);
+    ChainWALEntryFilter filter =
+        new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));
 
-    // 3. Only config table-cfs in peer
+    // 2. replicate_all flag is false, and only config table-cfs in peer
     // empty map
     userEntry = createEntry(null, a, b, c);
     Map<TableName, List<String>> tableCfs = new HashMap<>();
@@ -261,7 +256,7 @@ public class TestReplicationWALEntryFilters {
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a,c), filter.filter(userEntry));
 
-    // 3. Only config namespaces in peer
+    // 3. replicate_all flag is false, and only config namespaces in peer
     when(peer.getTableCFs()).thenReturn(null);
     // empty set
     Set<String> namespaces = new HashSet<>();
@@ -292,7 +287,7 @@ public class TestReplicationWALEntryFilters {
     filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));
 
-    // 4. Config namespaces and table-cfs both
+    // 4. replicate_all flag is false, and config namespaces and table-cfs both
     // Namespaces config should not confict with table-cfs config
     namespaces = new HashSet<>();
     tableCfs = new HashMap<>();
@@ -331,9 +326,110 @@ public class TestReplicationWALEntryFilters {
     assertEquals(null, filter.filter(userEntry));
   }
 
+  @Test
+  public void testNamespaceTableCfWALEntryFilter2() {
+    ReplicationPeer peer = mock(ReplicationPeer.class);
+    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
+
+    // 1. replicate_all flag is true
+    // and no exclude namespaces and no exclude table-cfs config
+    when(peerConfig.replicateAllUserTables()).thenReturn(true);
+    when(peerConfig.getExcludeNamespaces()).thenReturn(null);
+    when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
+    Entry userEntry = createEntry(null, a, b, c);
+    ChainWALEntryFilter filter =
+        new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
+
+    // 2. replicate_all flag is true, and only config exclude namespaces
+    // empty set
+    Set<String> namespaces = new HashSet<String>();
+    when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
+    when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
+
+    // exclude namespace default
+    namespaces.add("default");
+    when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
+    when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(null, filter.filter(userEntry));
+
+    // exclude namespace ns1
+    namespaces = new HashSet<String>();
+    namespaces.add("ns1");
+    when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
+    when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
+
+    // 3. replicate_all flag is true, and only config exclude table-cfs
+    // empty table-cfs map
+    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
+    when(peerConfig.getExcludeNamespaces()).thenReturn(null);
+    when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
+
+    // exclude table bar
+    tableCfs = new HashMap<TableName, List<String>>();
+    tableCfs.put(TableName.valueOf("bar"), null);
+    when(peerConfig.getExcludeNamespaces()).thenReturn(null);
+    when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
+
+    // exclude table foo:a
+    tableCfs = new HashMap<TableName, List<String>>();
+    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
+    when(peerConfig.getExcludeNamespaces()).thenReturn(null);
+    when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, b, c), filter.filter(userEntry));
+
+    // 4. replicate_all flag is true, and config exclude namespaces and table-cfs both
+    // exclude ns1 and table foo:a,c
+    namespaces = new HashSet<String>();
+    tableCfs = new HashMap<TableName, List<String>>();
+    namespaces.add("ns1");
+    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
+    when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
+    when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, b), filter.filter(userEntry));
+
+    // exclude namespace default and table ns1:bar
+    namespaces = new HashSet<String>();
+    tableCfs = new HashMap<TableName, List<String>>();
+    namespaces.add("default");
+    tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>());
+    when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
+    when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
+    when(peer.getPeerConfig()).thenReturn(peerConfig);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(null, filter.filter(userEntry));
+  }
+
   private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
-    WALKeyImpl key1 =
-        new WALKeyImpl(new byte[0], TableName.valueOf("foo"), System.currentTimeMillis(), scopes);
+    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"),
+      System.currentTimeMillis(), scopes);
     WALEdit edit1 = new WALEdit();
 
     for (byte[] kv : kvs) {
@@ -342,7 +438,6 @@ public class TestReplicationWALEntryFilters {
     return new Entry(key1, edit1);
   }
 
-
   private void assertEquals(Entry e1, Entry e2) {
     Assert.assertEquals(e1 == null, e2 == null);
     if (e1 == null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 50c086a..949bf68 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -152,7 +152,11 @@ module Hbase
     # Show the current tableCFs config for the specified peer
     def show_peer_tableCFs(id)
       rpc = @admin.getReplicationPeerConfig(id)
-      ReplicationPeerConfigUtil.convertToString(rpc.getTableCFsMap)
+      show_peer_tableCFs_by_config(rpc)
+    end
+
+    def show_peer_tableCFs_by_config(peer_config)
+      ReplicationPeerConfigUtil.convertToString(peer_config.getTableCFsMap)
     end
 
     #----------------------------------------------------------------------------------------------
@@ -274,6 +278,49 @@ module Hbase
       @replication_admin.updatePeerConfig(id, rpc)
     end
 
+    # Set exclude namespaces config for the specified peer
+    def set_peer_exclude_namespaces(id, exclude_namespaces)
+      return if exclude_namespaces.nil?
+      exclude_ns_set = java.util.HashSet.new
+      exclude_namespaces.each do |n|
+        exclude_ns_set.add(n)
+      end
+      rpc = get_peer_config(id)
+      return if rpc.nil?
+      rpc.setExcludeNamespaces(exclude_ns_set)
+      @admin.updateReplicationPeerConfig(id, rpc)
+    end
+
+    # Show the exclude namespaces config for the specified peer
+    def show_peer_exclude_namespaces(peer_config)
+      namespaces = peer_config.getExcludeNamespaces
+      return nil if namespaces.nil?
+      namespaces = java.util.ArrayList.new(namespaces)
+      java.util.Collections.sort(namespaces)
+      '!' + namespaces.join(';')
+    end
+
+    # Set exclude tableCFs config for the specified peer
+    def set_peer_exclude_tableCFs(id, exclude_tableCFs)
+      return if exclude_tableCFs.nil?
+      # convert tableCFs to TableName
+      map = java.util.HashMap.new
+      exclude_tableCFs.each do |key, val|
+        map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
+      end
+      rpc = get_peer_config(id)
+      return if rpc.nil?
+      rpc.setExcludeTableCFsMap(map)
+      @admin.updateReplicationPeerConfig(id, rpc)
+    end
+
+    # Show the exclude tableCFs config for the specified peer
+    def show_peer_exclude_tableCFs(peer_config)
+      tableCFs = peer_config.getExcludeTableCFsMap
+      return nil if tableCFs.nil?
+      '!' + ReplicationPeerConfigUtil.convertToString(tableCFs)
+    end
+
     #----------------------------------------------------------------------------------------------
     # Enables a table's replication switch
     def enable_tablerep(table_name)

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index 58886fc..a01a890 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -381,8 +381,10 @@ Shell.load_command_group(
     set_peer_namespaces
     append_peer_namespaces
     remove_peer_namespaces
+    set_peer_exclude_namespaces
     show_peer_tableCFs
     set_peer_tableCFs
+    set_peer_exclude_tableCFs
     set_peer_bandwidth
     list_replicated_tables
     append_peer_tableCFs

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index 6812df4..522d23d 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -23,7 +23,13 @@ module Shell
     class ListPeers < Command
       def help
         <<-EOF
-List all replication peer clusters.
+  List all replication peer clusters.
+
+  If replicate_all flag is false, the namespaces and table-cfs in peer config
+  will be replicated to peer cluster.
+
+  If replicate_all flag is true, all user tables will be replicate to peer
+  cluster, except that the namespaces and table-cfs in peer config.
 
   hbase> list_peers
 EOF
@@ -39,8 +45,13 @@ EOF
           id = peer.getPeerId
           state = peer.isEnabled ? 'ENABLED' : 'DISABLED'
           config = peer.getPeerConfig
-          namespaces = replication_admin.show_peer_namespaces(config)
-          tableCFs = replication_admin.show_peer_tableCFs(id)
+          if config.replicateAllUserTables
+            namespaces = replication_admin.show_peer_exclude_namespaces(config)
+            tableCFs = replication_admin.show_peer_exclude_tableCFs(config)
+          else
+            namespaces = replication_admin.show_peer_namespaces(config)
+            tableCFs = replication_admin.show_peer_tableCFs_by_config(config)
+          end
           formatter.row([id, config.getClusterKey,
                          config.getReplicationEndpointImpl, state,
                          config.replicateAllUserTables, namespaces, tableCFs,

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_namespaces.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_namespaces.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_namespaces.rb
new file mode 100644
index 0000000..bf9b90b
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_namespaces.rb
@@ -0,0 +1,52 @@
+#
+# Copyright The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class SetPeerExcludeNamespaces < Command
+      def help
+        <<-EOF
+  Set the namespaces which not replicated for the specified peer.
+
+  Note:
+  1. The replicate_all flag need to be true when set exclude namespaces.
+  2. Set a exclude namespace in the peer config means that all tables in this
+     namespace will not be replicated to the peer cluster. If peer config
+     already has a exclude table, then not allow set this table's namespace
+     as a exclude namespace.
+
+  Examples:
+
+    # set exclude namespaces config to null
+    hbase> set_peer_exclude_namespaces '1', []
+    # set namespaces which not replicated for a peer.
+    # set a exclude namespace in the peer config means that all tables in this
+    # namespace will not be replicated.
+    hbase> set_peer_exclude_namespaces '2', ["ns1", "ns2"]
+
+  EOF
+      end
+
+      def command(id, exclude_namespaces)
+        replication_admin.set_peer_exclude_namespaces(id, exclude_namespaces)
+      end
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_tableCFs.rb
new file mode 100644
index 0000000..25be364
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_tableCFs.rb
@@ -0,0 +1,51 @@
+#
+# Copyright The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class SetPeerExcludeTableCFs < Command
+      def help
+        <<-EOF
+  Set the table-cfs which not replicated for the specified peer.
+
+  Note:
+  1. The replicate_all flag need to be true when set exclude table-cfs.
+  2. If peer config already has a exclude namespace, then not allow set any
+     exclude table of this namespace to the peer config.
+
+  Examples:
+
+    # set exclude table-cfs to null
+    hbase> set_peer_exclude_tableCFs '1'
+    # set table / table-cf which not replicated for a peer, for a table without
+    # an explicit column-family list, all column-families will not be replicated
+    hbase> set_peer_exclude_tableCFs '2', { "ns1:table1" => [],
+                                    "ns2:table2" => ["cf1", "cf2"],
+                                    "ns3:table3" => ["cfA", "cfB"]}
+
+  EOF
+      end
+
+      def command(id, exclude_peer_table_cfs = nil)
+        replication_admin.set_peer_exclude_tableCFs(id, exclude_peer_table_cfs)
+      end
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb
index 6d14c1c..9f0649d 100644
--- a/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb
@@ -25,10 +25,10 @@ module Shell
         <<-EOF
   Set the replicable namespaces config for the specified peer.
 
-  Set a namespace in the peer config means that all tables in this
-  namespace will be replicated to the peer cluster. So if you already
-  have set a namespace in the peer config, then you can't set this
-  namespace's tables in the peer config again.
+  1. The replicate_all flag need to be false when set the replicable namespaces.
+  2. Set a namespace in the peer config means that all tables in this namespace
+     will be replicated to the peer cluster. If peer config already has a table,
+     then not allow set this table's namespace to the peer config.
 
   Examples:
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb
index f6de615..8996964 100644
--- a/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb
@@ -26,7 +26,10 @@ module Shell
   Set the replicate_all flag to true or false for the specified peer.
 
   If replicate_all flag is true, then all user tables (REPLICATION_SCOPE != 0)
-  will be replicate to peer cluster.
+  will be replicate to peer cluster. But you can use 'set_peer_exclude_namespaces'
+  to set which namespaces can't be replicated to peer cluster. And you can use
+  'set_peer_exclude_tableCFs' to set which tables can't be replicated to peer
+  cluster.
 
   If replicate_all flag is false, then all user tables cannot be replicate to
   peer cluster. Then you can use 'set_peer_namespaces' or 'append_peer_namespaces'
@@ -36,6 +39,9 @@ module Shell
 
   Notice: When you want to change a peer's replicate_all flag from false to true,
           you need clean the peer's NAMESPACES and TABLECFS config firstly.
+          When you want to change a peer's replicate_all flag from true to false,
+          you need clean the peer's EXCLUDE_NAMESPACES and EXCLUDE_TABLECFS
+          config firstly.
 
   Examples:
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
index 6da2f11..03b2186 100644
--- a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
@@ -25,8 +25,10 @@ module Shell
         <<-EOF
   Set the replicable table-cf config for the specified peer.
 
-  Can't set a table to table-cfs config if it's namespace already was in
-  namespaces config of this peer.
+  Note:
+  1. The replicate_all flag need to be false when set the replicable table-cfs.
+  2. Can't set a table to table-cfs config if it's namespace already was in
+     namespaces config of this peer.
 
   Examples:
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/03e79b79/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 4b74ada..0f84396 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -294,6 +294,29 @@ module Hbase
       command(:remove_peer, @peer_id)
     end
 
+    define_test 'set_peer_exclude_tableCFs: works with table-cfs map' do
+      cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
+      args = { CLUSTER_KEY => cluster_key }
+      command(:add_peer, @peer_id, args)
+
+      assert_equal(1, command(:list_peers).length)
+      peer = command(:list_peers).get(0)
+      assert_equal(@peer_id, peer.getPeerId)
+      assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
+
+      table_cfs = { 'table1' => [], 'table2' => ['cf1'],
+                    'ns3:table3' => ['cf1', 'cf2'] }
+      command(:set_peer_exclude_tableCFs, @peer_id, table_cfs)
+      assert_equal(1, command(:list_peers).length)
+      peer = command(:list_peers).get(0)
+      peer_config = peer.getPeerConfig
+      assert_equal(true, peer_config.replicateAllUserTables)
+      assert_tablecfs_equal(table_cfs, peer_config.getExcludeTableCFsMap)
+
+      # cleanup for future tests
+      replication_admin.remove_peer(@peer_id)
+    end
+
     define_test "set_peer_namespaces: works with namespaces array" do
       cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
       namespaces = ["ns1", "ns2"]
@@ -395,6 +418,25 @@ module Hbase
       command(:remove_peer, @peer_id)
     end
 
+    define_test 'set_peer_exclude_namespaces: works with namespaces array' do
+      cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
+      namespaces = ['ns1', 'ns2']
+      namespaces_str = '!ns1;ns2'
+
+      args = { CLUSTER_KEY => cluster_key }
+      command(:add_peer, @peer_id, args)
+      command(:set_peer_exclude_namespaces, @peer_id, namespaces)
+
+      assert_equal(1, command(:list_peers).length)
+      peer_config = command(:list_peers).get(0).getPeerConfig
+      assert_equal(true, peer_config.replicateAllUserTables)
+      assert_equal(namespaces_str,
+                   replication_admin.show_peer_exclude_namespaces(peer_config))
+
+      # cleanup for future tests
+      command(:remove_peer, @peer_id)
+    end
+
     define_test 'set_peer_replicate_all' do
       cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
 


Mime
View raw message