hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject git commit: HBASE-11897 Add append and remove peer table-cfs cmds for replication (Liu Shaoqui)
Date Fri, 12 Sep 2014 16:29:31 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 ba87dcc04 -> 511b20a22


HBASE-11897 Add append and remove peer table-cfs cmds for replication (Liu Shaoqui)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/511b20a2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/511b20a2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/511b20a2

Branch: refs/heads/branch-1
Commit: 511b20a227fca5ba3d283caf4f847c852d049451
Parents: ba87dcc
Author: stack <stack@apache.org>
Authored: Fri Sep 12 09:28:52 2014 -0700
Committer: stack <stack@apache.org>
Committed: Fri Sep 12 09:29:21 2014 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    | 159 +++++++++++++++++++
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |   4 +-
 .../hbase/replication/ReplicationPeer.java      |   3 +-
 .../replication/ReplicationPeerZKImpl.java      |  59 +------
 .../hbase/replication/ReplicationPeers.java     |   3 +-
 .../replication/ReplicationPeersZKImpl.java     |   4 +-
 .../replication/TableCfWALEntryFilter.java      |   5 +-
 .../replication/TestReplicationAdmin.java       |  56 ++++++-
 .../replication/TestPerTableCFReplication.java  |  96 +++++------
 .../TestReplicationWALEntryFilters.java         |  16 +-
 .../src/main/ruby/hbase/replication_admin.rb    |  12 ++
 hbase-shell/src/main/ruby/shell.rb              |   2 +
 .../ruby/shell/commands/append_peer_tableCFs.rb |  41 +++++
 .../ruby/shell/commands/remove_peer_tableCFs.rb |  42 +++++
 14 files changed, 386 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 4028d87..39c802f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -22,10 +22,14 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 
 /**
  * <p>
@@ -169,6 +174,55 @@ public class ReplicationAdmin implements Closeable {
     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
   }
 
+  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig)
{
+    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
+      return null;
+    }
+
+    Map<TableName, List<String>> tableCFsMap = null;
+    // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
+    // parse out (table, cf-list) pairs from tableCFsConfig
+    // format: "table1:cf1,cf2;table2:cfA,cfB"
+    String[] tables = tableCFsConfig.split(";");
+    for (String tab : tables) {
+      // 1 ignore empty table config
+      tab = tab.trim();
+      if (tab.length() == 0) {
+        continue;
+      }
+      // 2 split to "table" and "cf1,cf2"
+      //   for each table: "table:cf1,cf2" or "table"
+      String[] pair = tab.split(":");
+      String tabName = pair[0].trim();
+      if (pair.length > 2 || tabName.length() == 0) {
+        LOG.error("ignore invalid tableCFs setting: " + tab);
+        continue;
+      }
+
+      // 3 parse "cf1,cf2" part to List<cf>
+      List<String> cfs = null;
+      if (pair.length == 2) {
+        String[] cfsList = pair[1].split(",");
+        for (String cf : cfsList) {
+          String cfName = cf.trim();
+          if (cfName.length() > 0) {
+            if (cfs == null) {
+              cfs = new ArrayList<String>();
+            }
+            cfs.add(cfName);
+          }
+        }
+      }
+
+      // 4 put <table, List<cf>> to map
+      if (tableCFsMap == null) {
+        tableCFsMap = new HashMap<TableName, List<String>>();
+      }
+      tableCFsMap.put(TableName.valueOf(tabName), cfs);
+    }
+    return tableCFsMap;
+  }
+
   @VisibleForTesting
   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs)
{
     String tableCfsStr = null;
@@ -265,6 +319,111 @@ public class ReplicationAdmin implements Closeable {
   }
 
   /**
+   * Append the replicable table-cf config of the specified peer
+   * @param id a short that identifies the cluster
+   * @param tableCfs table-cfs config str
+   * @throws KeeperException
+   */
+  public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException
{
+    appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
+  }
+
+  /**
+   * Append the replicable table-cf config of the specified peer
+   * @param id a short that identifies the cluster
+   * @param tableCfs A map from tableName to column family names
+   * @throws KeeperException
+   */
+  public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>>
tableCfs)
+      throws ReplicationException {
+    if (tableCfs == null) {
+      throw new ReplicationException("tableCfs is null");
+    }
+    Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
+    if (preTableCfs == null) {
+      setPeerTableCFs(id, tableCfs);
+      return;
+    }
+
+    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet())
{
+      TableName table = entry.getKey();
+      Collection<String> appendCfs = entry.getValue();
+      if (preTableCfs.containsKey(table)) {
+        List<String> cfs = preTableCfs.get(table);
+        if (cfs == null || appendCfs == null) {
+          preTableCfs.put(table, null);
+        } else {
+          Set<String> cfSet = new HashSet<String>(cfs);
+          cfSet.addAll(appendCfs);
+          preTableCfs.put(table, Lists.newArrayList(cfSet));
+        }
+      } else {
+        if (appendCfs == null || appendCfs.isEmpty()) {
+          preTableCfs.put(table, null);
+        } else {
+          preTableCfs.put(table, Lists.newArrayList(appendCfs));
+        }
+      }
+    }
+    setPeerTableCFs(id, preTableCfs);
+  }
+
+  /**
+   * Remove some table-cfs from table-cfs config of the specified peer
+   * @param id a short name that identifies the cluster
+   * @param tableCf table-cfs config str
+   * @throws ReplicationException
+   */
+  public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
+    removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
+  }
+
+  /**
+   * Remove some table-cfs from config of the specified peer
+   * @param id a short name that identifies the cluster
+   * @param tableCfs A map from tableName to column family names
+   * @throws ReplicationException
+   */
+  public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>>
tableCfs)
+      throws ReplicationException {
+    if (tableCfs == null) {
+      throw new ReplicationException("tableCfs is null");
+    }
+
+    Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
+    if (preTableCfs == null) {
+      throw new ReplicationException("Table-Cfs for peer" + id + " is null");
+    }
+    for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet())
{
+      TableName table = entry.getKey();
+      Collection<String> removeCfs = entry.getValue();
+      if (preTableCfs.containsKey(table)) {
+        List<String> cfs = preTableCfs.get(table);
+        if (cfs == null && removeCfs == null) {
+          preTableCfs.remove(table);
+        } else if (cfs != null && removeCfs != null) {
+          Set<String> cfSet = new HashSet<String>(cfs);
+          cfSet.removeAll(removeCfs);
+          if (cfSet.isEmpty()) {
+            preTableCfs.remove(table);
+          } else {
+            preTableCfs.put(table, Lists.newArrayList(cfSet));
+          }
+        } else if (cfs == null && removeCfs != null) {
+          throw new ReplicationException("Cannot remove cf of table: " + table
+              + " which doesn't specify cfs from table-cfs config in peer: " + id);
+        } else if (cfs != null && removeCfs == null) {
+          throw new ReplicationException("Cannot remove table: " + table
+              + " which has specified cfs from table-cfs config in peer: " + id);
+        }
+      } else {
+        throw new ReplicationException("No table: " + table + " in table-cfs config of peer:
" + id);
+      }
+    }
+    setPeerTableCFs(id, preTableCfs);
+  }
+
+  /**
    * Set the replicable table-cf config of the specified peer
    * @param id a short name that identifies the cluster
    * @param tableCfs the table and column-family list which will be replicated for this peer.

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index df6016a..4970b19 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -1483,8 +1483,8 @@ public final class ProtobufUtil {
   /**
    * Convert a protocol buffer DeleteType to delete KeyValue type.
    *
-   * @param protocol buffer DeleteType
-   * @return type
+   * @param type The DeleteType
+   * @return The type.
    * @throws IOException
    */
   public static KeyValue.Type fromDeleteType(

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index c116674..729d2c7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
 
 /**
  * ReplicationPeer manages enabled / disabled state for the peer.
@@ -67,6 +68,6 @@ public interface ReplicationPeer {
    * Get replicable (table, cf-list) map of this peer
    * @return the replicable (table, cf-list) map
    */
-  public Map<String, List<String>> getTableCFs();
+  public Map<TableName, List<String>> getTableCFs();
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index a39392c..0eaa744 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +31,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
@@ -49,7 +52,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable,
Closea
   private final ReplicationPeerConfig peerConfig;
   private final String id;
   private volatile PeerState peerState;
-  private volatile Map<String, List<String>> tableCFs = new HashMap<String,
List<String>>();
+  private volatile Map<TableName, List<String>> tableCFs = new HashMap<TableName,
List<String>>();
   private final Configuration conf;
 
   private PeerStateTracker peerStateTracker;
@@ -110,59 +113,9 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable,
Closea
     this.readTableCFsZnode();
   }
 
-  static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig)
{
-    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
-      return null;
-    }
-
-    Map<String, List<String>> tableCFsMap = null;
-    // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
-    // parse out (table, cf-list) pairs from tableCFsConfig
-    // format: "table1:cf1,cf2;table2:cfA,cfB"
-    String[] tables = tableCFsConfig.split(";");
-    for (String tab : tables) {
-      // 1 ignore empty table config
-      tab = tab.trim();
-      if (tab.length() == 0) {
-        continue;
-      }
-      // 2 split to "table" and "cf1,cf2"
-      //   for each table: "table:cf1,cf2" or "table"
-      String[] pair = tab.split(":");
-      String tabName = pair[0].trim();
-      if (pair.length > 2 || tabName.length() == 0) {
-        LOG.error("ignore invalid tableCFs setting: " + tab);
-        continue;
-      }
-
-      // 3 parse "cf1,cf2" part to List<cf>
-      List<String> cfs = null;
-      if (pair.length == 2) {
-        String[] cfsList = pair[1].split(",");
-        for (String cf : cfsList) {
-          String cfName = cf.trim();
-          if (cfName.length() > 0) {
-            if (cfs == null) {
-              cfs = new ArrayList<String>();
-            }
-            cfs.add(cfName);
-          }
-        }
-      }
-
-      // 4 put <table, List<cf>> to map
-      if (tableCFsMap == null) {
-        tableCFsMap = new HashMap<String, List<String>>();
-      }
-      tableCFsMap.put(tabName, cfs);
-    }
-
-    return tableCFsMap;
-  }
-
   private void readTableCFsZnode() {
     String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
-    this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
+    this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs);
   }
 
   @Override
@@ -202,7 +155,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable,
Closea
    * @return the replicable (table, cf-list) map
    */
   @Override
-  public Map<String, List<String>> getTableCFs() {
+  public Map<TableName, List<String>> getTableCFs() {
     return this.tableCFs;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index b1c3b49..da54c54 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Pair;
 
 /**
@@ -94,7 +95,7 @@ public interface ReplicationPeers {
    * @param peerId a short that identifies the cluster
    * @return the table and column-family list which will be replicated for this peer
    */
-  public Map<String, List<String>> getTableCFs(String peerId);
+  public Map<TableName, List<String>> getTableCFs(String peerId);
 
   /**
    * Returns the ReplicationPeer

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index af028fb..a18d8e8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.CompoundConfiguration;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -188,7 +190,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements
Re
   }
 
   @Override
-  public Map<String, List<String>> getTableCFs(String id) throws IllegalArgumentException
{
+  public Map<TableName, List<String>> getTableCFs(String id) throws IllegalArgumentException
{
     ReplicationPeer replicationPeer = this.peerClusters.get(id);
     if (replicationPeer == null) {
       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
index 44e3c1c..0ea267d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -39,9 +40,9 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
 
   @Override
   public Entry filter(Entry entry) {
-    String tabName = entry.getKey().getTablename().getNameAsString();
+    TableName tabName = entry.getKey().getTablename();
     ArrayList<Cell> cells = entry.getEdit().getCells();
-    Map<String, List<String>> tableCFs = null;
+    Map<TableName, List<String>> tableCFs = null;
 
     try {
       tableCFs = this.peer.getTableCFs();

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 77bc64e..fcebfc5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -154,5 +155,58 @@ public class TestReplicationAdmin {
     assertEquals("tab1;tab2:cf1;tab3:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
   }
 
-}
+  @Test
+  public void testAppendPeerTableCFs() throws Exception {
+    // Add a valid peer
+    admin.addPeer(ID_ONE, KEY_ONE);
+
+    admin.appendPeerTableCFs(ID_ONE, "t1");
+    assertEquals("t1", admin.getPeerTableCFs(ID_ONE));
+
+    // append table t2 to replication
+    admin.appendPeerTableCFs(ID_ONE, "t2");
+    assertEquals("t2;t1", admin.getPeerTableCFs(ID_ONE));
+
+    // append table column family: f1 of t3 to replication
+    admin.appendPeerTableCFs(ID_ONE, "t3:f1");
+    assertEquals("t3:f1;t2;t1", admin.getPeerTableCFs(ID_ONE));
+    admin.removePeer(ID_ONE);
+  }
+
+  @Test
+  public void testRemovePeerTableCFs() throws Exception {
+    // Add a valid peer
+    admin.addPeer(ID_ONE, KEY_ONE);
+    try {
+      admin.removePeerTableCFs(ID_ONE, "t3");
+      assertTrue(false);
+    } catch (ReplicationException e) {
+    }
+    assertEquals("", admin.getPeerTableCFs(ID_ONE));
 
+    admin.setPeerTableCFs(ID_ONE, "t1;t2:cf1");
+    try {
+      admin.removePeerTableCFs(ID_ONE, "t3");
+      assertTrue(false);
+    } catch (ReplicationException e) {
+    }
+    assertEquals("t1;t2:cf1", admin.getPeerTableCFs(ID_ONE));
+
+    try {
+      admin.removePeerTableCFs(ID_ONE, "t1:f1");
+      assertTrue(false);
+    } catch (ReplicationException e) {
+    }
+    admin.removePeerTableCFs(ID_ONE, "t1");
+    assertEquals("t2:cf1", admin.getPeerTableCFs(ID_ONE));
+
+    try {
+      admin.removePeerTableCFs(ID_ONE, "t2");
+      assertTrue(false);
+    } catch (ReplicationException e) {
+    }
+    admin.removePeerTableCFs(ID_ONE, "t2:cf1");
+    assertEquals("", admin.getPeerTableCFs(ID_ONE));
+    admin.removePeer(ID_ONE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index bc3c38f..6d8b68c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -173,89 +173,93 @@ public class TestPerTableCFReplication {
 
   @Test
   public void testParseTableCFsFromConfig() {
-    Map<String, List<String>> tabCFsMap = null;
+    Map<TableName, List<String>> tabCFsMap = null;
 
     // 1. null or empty string, result should be null
-    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(null);
+    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(null);
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("");
+    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("");
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("   ");
+    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("   ");
     assertEquals(null, tabCFsMap);
 
+    TableName tab1 = TableName.valueOf("tab1");
+    TableName tab2 = TableName.valueOf("tab2");
+    TableName tab3 = TableName.valueOf("tab3");
+
     // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
-    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1");
+    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1");
     assertEquals(1, tabCFsMap.size()); // only one table
-    assertTrue(tabCFsMap.containsKey("tab1"));   // its table name is "tab1"
-    assertFalse(tabCFsMap.containsKey("tab2"));  // not other table
-    assertEquals(null, tabCFsMap.get("tab1"));   // null cf-list,
+    assertTrue(tabCFsMap.containsKey(tab1));   // its table name is "tab1"
+    assertFalse(tabCFsMap.containsKey(tab2));  // not other table
+    assertEquals(null, tabCFsMap.get(tab1));   // null cf-list,
 
-    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab2:cf1");
+    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab2:cf1");
     assertEquals(1, tabCFsMap.size()); // only one table
-    assertTrue(tabCFsMap.containsKey("tab2"));   // its table name is "tab2"
-    assertFalse(tabCFsMap.containsKey("tab1"));  // not other table
-    assertEquals(1, tabCFsMap.get("tab2").size());   // cf-list contains only 1 cf
-    assertEquals("cf1", tabCFsMap.get("tab2").get(0));// the only cf is "cf1"
+    assertTrue(tabCFsMap.containsKey(tab2));   // its table name is "tab2"
+    assertFalse(tabCFsMap.containsKey(tab1));  // not other table
+    assertEquals(1, tabCFsMap.get(tab2).size());   // cf-list contains only 1 cf
+    assertEquals("cf1", tabCFsMap.get(tab2).get(0));// the only cf is "cf1"
 
-    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab3 : cf1 , cf3");
+    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab3 : cf1 , cf3");
     assertEquals(1, tabCFsMap.size()); // only one table
-    assertTrue(tabCFsMap.containsKey("tab3"));   // its table name is "tab2"
-    assertFalse(tabCFsMap.containsKey("tab1"));  // not other table
-    assertEquals(2, tabCFsMap.get("tab3").size());   // cf-list contains 2 cf
-    assertTrue(tabCFsMap.get("tab3").contains("cf1"));// contains "cf1"
-    assertTrue(tabCFsMap.get("tab3").contains("cf3"));// contains "cf3"
+    assertTrue(tabCFsMap.containsKey(tab3));   // its table name is "tab2"
+    assertFalse(tabCFsMap.containsKey(tab1));  // not other table
+    assertEquals(2, tabCFsMap.get(tab3).size());   // cf-list contains 2 cf
+    assertTrue(tabCFsMap.get(tab3).contains("cf1"));// contains "cf1"
+    assertTrue(tabCFsMap.get(tab3).contains("cf3"));// contains "cf3"
 
     // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
-    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
+    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
     // 3.1 contains 3 tables : "tab1", "tab2" and "tab3"
     assertEquals(3, tabCFsMap.size());
-    assertTrue(tabCFsMap.containsKey("tab1"));
-    assertTrue(tabCFsMap.containsKey("tab2"));
-    assertTrue(tabCFsMap.containsKey("tab3"));
+    assertTrue(tabCFsMap.containsKey(tab1));
+    assertTrue(tabCFsMap.containsKey(tab2));
+    assertTrue(tabCFsMap.containsKey(tab3));
     // 3.2 table "tab1" : null cf-list
-    assertEquals(null, tabCFsMap.get("tab1"));
+    assertEquals(null, tabCFsMap.get(tab1));
     // 3.3 table "tab2" : cf-list contains a single cf "cf1"
-    assertEquals(1, tabCFsMap.get("tab2").size());
-    assertEquals("cf1", tabCFsMap.get("tab2").get(0));
+    assertEquals(1, tabCFsMap.get(tab2).size());
+    assertEquals("cf1", tabCFsMap.get(tab2).get(0));
     // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
-    assertEquals(2, tabCFsMap.get("tab3").size());
-    assertTrue(tabCFsMap.get("tab3").contains("cf1"));
-    assertTrue(tabCFsMap.get("tab3").contains("cf3"));
+    assertEquals(2, tabCFsMap.get(tab3).size());
+    assertTrue(tabCFsMap.get(tab3).contains("cf1"));
+    assertTrue(tabCFsMap.get(tab3).contains("cf3"));
 
     // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
     // still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
-    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
+    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
       "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
     // 4.1 contains 3 tables : "tab1", "tab2" and "tab3"
     assertEquals(3, tabCFsMap.size());
-    assertTrue(tabCFsMap.containsKey("tab1"));
-    assertTrue(tabCFsMap.containsKey("tab2"));
-    assertTrue(tabCFsMap.containsKey("tab3"));
+    assertTrue(tabCFsMap.containsKey(tab1));
+    assertTrue(tabCFsMap.containsKey(tab2));
+    assertTrue(tabCFsMap.containsKey(tab3));
     // 4.2 table "tab1" : null cf-list
-    assertEquals(null, tabCFsMap.get("tab1"));
+    assertEquals(null, tabCFsMap.get(tab1));
     // 4.3 table "tab2" : cf-list contains a single cf "cf1"
-    assertEquals(1, tabCFsMap.get("tab2").size());
-    assertEquals("cf1", tabCFsMap.get("tab2").get(0));
+    assertEquals(1, tabCFsMap.get(tab2).size());
+    assertEquals("cf1", tabCFsMap.get(tab2).get(0));
     // 4.4 table "tab3" : cf-list contains "cf1" and "cf3"
-    assertEquals(2, tabCFsMap.get("tab3").size());
-    assertTrue(tabCFsMap.get("tab3").contains("cf1"));
-    assertTrue(tabCFsMap.get("tab3").contains("cf3"));
+    assertEquals(2, tabCFsMap.get(tab3).size());
+    assertTrue(tabCFsMap.get(tab3).contains("cf1"));
+    assertTrue(tabCFsMap.get(tab3).contains("cf3"));
 
     // 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"
     //    "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally
-    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
+    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
       "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
     // 5.1 no "tab1" and "tab2", only "tab3"
     assertEquals(1, tabCFsMap.size()); // only one table
-    assertFalse(tabCFsMap.containsKey("tab1"));
-    assertFalse(tabCFsMap.containsKey("tab2"));
-    assertTrue(tabCFsMap.containsKey("tab3"));
+    assertFalse(tabCFsMap.containsKey(tab1));
+    assertFalse(tabCFsMap.containsKey(tab2));
+    assertTrue(tabCFsMap.containsKey(tab3));
    // 5.2 table "tab3" : cf-list contains "cf1" and "cf3"
-    assertEquals(2, tabCFsMap.get("tab3").size());
-    assertTrue(tabCFsMap.get("tab3").contains("cf1"));
-    assertTrue(tabCFsMap.get("tab3").contains("cf3"));
+    assertEquals(2, tabCFsMap.get(tab3).size());
+    assertTrue(tabCFsMap.get(tab3).contains("cf1"));
+    assertTrue(tabCFsMap.get(tab3).contains("cf3"));
  }
 
   @Test(timeout=300000)

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 41a4c14..dfe043f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -212,31 +212,31 @@ public class TestReplicationWALEntryFilters {
 
     // empty map
     userEntry = createEntry(a, b, c);
-    Map<String, List<String>> tableCfs = new HashMap<String, List<String>>();
+    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
     when(peer.getTableCFs()).thenReturn(tableCfs);
     filter = new TableCfWALEntryFilter(peer);
     assertEquals(null, filter.filter(userEntry));
 
     // table bar
     userEntry = createEntry(a, b, c);
-    tableCfs = new HashMap<String, List<String>>();
-    tableCfs.put("bar", null);
+    tableCfs = new HashMap<TableName, List<String>>();
+    tableCfs.put(TableName.valueOf("bar"), null);
     when(peer.getTableCFs()).thenReturn(tableCfs);
     filter = new TableCfWALEntryFilter(peer);
     assertEquals(null, filter.filter(userEntry));
 
     // table foo:a
     userEntry = createEntry(a, b, c);
-    tableCfs = new HashMap<String, List<String>>();
-    tableCfs.put("foo", Lists.newArrayList("a"));
+    tableCfs = new HashMap<TableName, List<String>>();
+    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
     when(peer.getTableCFs()).thenReturn(tableCfs);
     filter = new TableCfWALEntryFilter(peer);
     assertEquals(createEntry(a), filter.filter(userEntry));
 
     // table foo:a,c
     userEntry = createEntry(a, b, c, d);
-    tableCfs = new HashMap<String, List<String>>();
-    tableCfs.put("foo", Lists.newArrayList("a", "c"));
+    tableCfs = new HashMap<TableName, List<String>>();
+    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
     when(peer.getTableCFs()).thenReturn(tableCfs);
     filter = new TableCfWALEntryFilter(peer);
     assertEquals(createEntry(a,c), filter.filter(userEntry));
@@ -273,6 +273,4 @@ public class TestReplicationWALEntryFilters {
       KeyValue.COMPARATOR.compare(cells1.get(i), cells2.get(i));
     }
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index cc9e41f..6dedb2e 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -86,5 +86,17 @@ module Hbase
     def set_peer_tableCFs(id, tableCFs)
       @replication_admin.setPeerTableCFs(id, tableCFs)
     end
+
+    #----------------------------------------------------------------------------------------------
+    # Append a tableCFs config for the specified peer
+    def append_peer_tableCFs(id, tableCFs)
+      @replication_admin.appendPeerTableCFs(id, tableCFs)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Remove some tableCFs from the tableCFs config of the specified peer
+    def remove_peer_tableCFs(id, tableCFs)
+      @replication_admin.removePeerTableCFs(id, tableCFs)
+    end
   end
 end

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index 7e948b7..f8ab347 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -335,6 +335,8 @@ Shell.load_command_group(
     show_peer_tableCFs
     set_peer_tableCFs
     list_replicated_tables
+    append_peer_tableCFs
+    remove_peer_tableCFs
   ]
 )
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
new file mode 100644
index 0000000..3919b20
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
@@ -0,0 +1,41 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class AppendPeerTableCFs< Command
+      def help
+        return <<-EOF
+Append a replicable table-cf config for the specified peer
+Examples:
+
+  # append a table / table-cf to be replicable for a peer
+  hbase> append_peer_tableCFs '2', "table4:cfA,cfB"
+
+EOF
+      end
+
+      def command(id, table_cfs)
+        format_simple_command do
+          replication_admin.append_peer_tableCFs(id, table_cfs)
+        end
+      end
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/hbase/blob/511b20a2/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
new file mode 100644
index 0000000..5b15b52
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
@@ -0,0 +1,42 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class RemovePeerTableCFs < Command
+      def help
+        return <<-EOF
+Remove a table / table-cf from the table-cfs config for the specified peer
+Examples:
+
+  # Remove a table / table-cf from the replicable table-cfs for a peer
+  hbase> remove_peer_tableCFs '2', "table1"
+  hbase> remove_peer_tableCFs '2', "table1:cf1"
+
+EOF
+      end
+
+      def command(id, table_cfs)
+        format_simple_command do
+          replication_admin.remove_peer_tableCFs(id, table_cfs)
+        end
+      end
+    end
+  end
+end


Mime
View raw message