hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-17543 - Create additional ReplicationEndpoint WALEntryFilters by configuration
Date Wed, 01 Feb 2017 00:17:43 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 c937e9786 -> 3aac1b688


HBASE-17543 - Create additional ReplicationEndpoint WALEntryFilters by configuration

Signed-off-by: tedyu <yuzhihong@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3aac1b68
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3aac1b68
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3aac1b68

Branch: refs/heads/branch-1
Commit: 3aac1b6884b43fbfd7a91d0f5cc765214e16d9a7
Parents: c937e97
Author: gjacoby <gjacoby@salesforce.com>
Authored: Tue Jan 31 15:41:12 2017 -0800
Committer: tedyu <yuzhihong@gmail.com>
Committed: Tue Jan 31 16:16:20 2017 -0800

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    | 31 +++++++++---
 .../replication/BaseReplicationEndpoint.java    | 16 +++++++
 .../replication/TestReplicationEndpoint.java    | 50 ++++++++++++++++++--
 3 files changed, 87 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3aac1b68/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 76573f4..d462f38 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -36,12 +36,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Admin;
@@ -89,7 +84,8 @@ public class ReplicationAdmin implements Closeable {
 
   public static final String TNAME = "tableName";
   public static final String CFNAME = "columnFamlyName";
-
+  public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY
+      = "hbase.replication.source.custom.walentryfilters";
   // only Global for now, can add other type
   // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
   public static final String REPLICATIONTYPE = "replicationType";
@@ -203,6 +199,7 @@ public class ReplicationAdmin implements Closeable {
     if (tableCfs != null) {
       peerConfig.setTableCFsMap(tableCfs);
     }
+    checkConfiguredWALEntryFilters(peerConfig);
     this.replicationPeers.addPeer(id, peerConfig);
   }
 
@@ -212,11 +209,13 @@ public class ReplicationAdmin implements Closeable {
    * @param peerConfig configuration for the replication slave cluster
    */
   public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException
{
+    checkConfiguredWALEntryFilters(peerConfig);
     this.replicationPeers.addPeer(id, peerConfig);
   }
 
   public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
     throws ReplicationException {
+    checkConfiguredWALEntryFilters(peerConfig);
     this.replicationPeers.updatePeerConfig(id, peerConfig);
   }
 
@@ -692,4 +691,22 @@ public class ReplicationAdmin implements Closeable {
     }
     return true;
   }
+
+  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
+    throws ReplicationException {
+    String filterCSV = peerConfig.getConfiguration().
+        get(REPLICATION_WALENTRYFILTER_CONFIG_KEY);
+    if (filterCSV != null && !filterCSV.isEmpty()) {
+      String[] filters = filterCSV.split(",");
+      for (String filter : filters) {
+        try {
+          Class clazz = Class.forName(filter);
+          Object o = clazz.newInstance();
+        } catch (Exception e) {
+          throw new ReplicationException("Configured WALEntryFilter " + filter +
+              " could not be created. Failing add/update " + "peer operation.", e);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3aac1b68/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
index bc73f74..71a222a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractService;
 
+import static org.apache.hadoop.hbase.client.replication.ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY;
+
 /**
  * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending
this
  * class rather than implementing {@link ReplicationEndpoint} directly for better backwards
@@ -75,6 +77,20 @@ public abstract class BaseReplicationEndpoint extends AbstractService
     if (tableCfFilter != null) {
       filters.add(tableCfFilter);
     }
+    if (ctx != null && ctx.getPeerConfig() != null) {
+      String filterNameCSV = ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY);
+      if (filterNameCSV != null && !filterNameCSV.isEmpty()) {
+          String[] filterNames = filterNameCSV.split(",");
+          for (String filterName : filterNames) {
+              try {
+                  Class<?> clazz = Class.forName(filterName);
+                  filters.add((WALEntryFilter) clazz.newInstance());
+                } catch (Exception e) {
+                  LOG.error("Unable to create WALEntryFilter " + filterName, e);
+                }
+            }
+        }
+    }
     return filters.isEmpty() ? null : new ChainWALEntryFilter(filters);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3aac1b68/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index b9d5582..7128adc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Waiter;
@@ -238,9 +239,13 @@ public class TestReplicationEndpoint extends TestReplicationBase {
 
   @Test (timeout=120000)
   public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
-    admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
-        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()),
null);
+    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+            .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
+    //test that we can create mutliple WALFilters reflectively
+    rpc.getConfiguration().put(ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
+        EverythingPassesWALEntryFilter.class.getName() + ","
+            + EverythingPassesWALEntryFilterSubclass.class.getName());
+    admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
     // now replicate some data.
     try (Connection connection = ConnectionFactory.createConnection(conf1)) {
       doPut(connection, Bytes.toBytes("row1"));
@@ -256,9 +261,31 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     });
 
     Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
+    //make sure our reflectively created filter is in the filter chain
+    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
     admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
   }
 
+  @Test (timeout=120000, expected=ReplicationException.class)
+  public void testWALEntryFilterAddValidation() throws Exception {
+    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+            .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
+    //test that we can create mutliple WALFilters reflectively
+        rpc.getConfiguration().put(ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
+                "IAmNotARealWalEntryFilter");
+    admin.addPeer("testWALEntryFilterAddValidation", rpc);
+  }
+
+      @Test (timeout=120000, expected=ReplicationException.class)
+  public void testWALEntryFilterUpdateValidation() throws Exception {
+    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+            .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
+    //test that we can create mutliple WALFilters reflectively
+        rpc.getConfiguration().put(ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
+                "IAmNotARealWalEntryFilter");
+    admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
+  }
+
 
   @Test
   public void testMetricsSourceBaseSourcePassthrough(){
@@ -487,4 +514,21 @@ public class TestReplicationEndpoint extends TestReplicationBase {
       });
     }
   }
+
+  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
+    private static boolean passedEntry = false;
+    @Override
+    public Entry filter(Entry entry) {
+      passedEntry = true;
+      return entry;
+    }
+
+    public static boolean hasPassedAnEntry(){
+        return passedEntry;
+    }
+  }
+
+  public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter
{
+
+  }
 }


Mime
View raw message