hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [1/3] HBASE-11367 Pluggable replication endpoint
Date Mon, 14 Jul 2014 23:23:51 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 4824b0dea -> 463d52d8c


http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
new file mode 100644
index 0000000..e4ec0bc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * A {@link ReplicationEndpoint} implementation for replicating to another HBase cluster.
+ * For the slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ * <p/>
+ * A stream is considered down when we cannot contact a region server on the
+ * peer cluster for more than 55 seconds by default.
+ */
+@InterfaceAudience.Private
+public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
+
+  private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
+  private HConnection conn;
+
+  private Configuration conf;
+
+  // How long should we sleep for each retry
+  private long sleepForRetries;
+
+  // Maximum number of retries before taking bold actions
+  private int maxRetriesMultiplier;
+  // Socket timeouts require even bolder actions since we don't want to DDOS
+  private int socketTimeoutMultiplier;
+  //Metrics for this source
+  private MetricsSource metrics;
+  // Handles connecting to peer region servers
+  private ReplicationSinkManager replicationSinkMgr;
+  private boolean peersSelected = false;
+
+  @Override
+  public void init(Context context) throws IOException {
+    super.init(context);
+    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
+    decorateConf();
+    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
+    this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
+        maxRetriesMultiplier * maxRetriesMultiplier);
+    // TODO: This connection is replication specific or we should make it particular to
+    // replication and make replication specific settings such as compression or codec to use
+    // passing Cells.
+    this.conn = HConnectionManager.createConnection(this.conf);
+    this.sleepForRetries =
+        this.conf.getLong("replication.source.sleepforretries", 1000);
+    this.metrics = context.getMetrics();
+    // ReplicationQueueInfo parses the peerId out of the znode for us
+    this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+  }
+
+  private void decorateConf() {
+    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
+    if (StringUtils.isNotEmpty(replicationCodec)) {
+      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
+    }
+  }
+
+  private void connectToPeers() {
+    getRegionServers();
+
+    int sleepMultiplier = 1;
+
+    // Connect to peer cluster first, unless we have to stop
+    while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+      replicationSinkMgr.chooseSinks();
+      if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+    }
+  }
+
+  /**
+   * Do the sleeping logic
+   * @param msg Why we sleep
+   * @param sleepMultiplier by how many times the default sleeping time is augmented
+   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
+   */
+  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+      }
+      Thread.sleep(this.sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted while sleeping between retries");
+    }
+    return sleepMultiplier < maxRetriesMultiplier;
+  }
+
+  /**
+   * Do the shipping logic
+   */
+  @Override
+  public boolean replicate(ReplicateContext replicateContext) {
+    List<HLog.Entry> entries = replicateContext.getEntries();
+    int sleepMultiplier = 1;
+    while (this.isRunning()) {
+      if (!peersSelected) {
+        connectToPeers();
+        peersSelected = true;
+      }
+
+      if (!isPeerEnabled()) {
+        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+      SinkPeer sinkPeer = null;
+      try {
+        sinkPeer = replicationSinkMgr.getReplicationSink();
+        BlockingInterface rrs = sinkPeer.getRegionServer();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Replicating " + entries.size() +
+              " entries of total size " + replicateContext.getSize());
+        }
+        ReplicationProtbufUtil.replicateWALEntry(rrs,
+            entries.toArray(new HLog.Entry[entries.size()]));
+
+        // update metrics
+        this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
+        return true;
+
+      } catch (IOException ioe) {
+        // Didn't ship anything, but must still age the last time we did
+        this.metrics.refreshAgeOfLastShippedOp();
+        if (ioe instanceof RemoteException) {
+          ioe = ((RemoteException) ioe).unwrapRemoteException();
+          LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
+          if (ioe instanceof TableNotFoundException) {
+            if (sleepForRetries("A table is missing in the peer cluster. "
+                + "Replication cannot proceed without losing data.", sleepMultiplier)) {
+              sleepMultiplier++;
+            }
+          }
+        } else {
+          if (ioe instanceof SocketTimeoutException) {
+            // This exception means we waited for more than 60s and nothing
+            // happened, the cluster is alive and calling it right away
+            // even for a test just makes things worse.
+            sleepForRetries("Encountered a SocketTimeoutException. Since the " +
+              "call to the remote cluster timed out, which is usually " +
+              "caused by a machine failure or a massive slowdown",
+              this.socketTimeoutMultiplier);
+          } else if (ioe instanceof ConnectException) {
+            LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
+            replicationSinkMgr.chooseSinks();
+          } else {
+            LOG.warn("Can't replicate because of a local or network error: ", ioe);
+          }
+        }
+
+        if (sinkPeer != null) {
+          replicationSinkMgr.reportBadSink(sinkPeer);
+        }
+        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+    }
+    return false; // in case we exited before replicating
+  }
+
+  protected boolean isPeerEnabled() {
+    return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
+  }
+
+  @Override
+  protected void doStop() {
+    disconnect(); //don't call super.doStop()
+    if (this.conn != null) {
+      try {
+        this.conn.close();
+        this.conn = null;
+      } catch (IOException e) {
+        LOG.warn("Failed to close the connection");
+      }
+    }
+    notifyStopped();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b38a0c8..94dec7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -22,13 +22,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * This class is for maintaining the various replication statistics for a source and publishing them
  * through the metrics interfaces.
  */
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public class MetricsSource {
 
   public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
@@ -152,7 +153,7 @@ public class MetricsSource {
     rms.incCounters(shippedKBsKey, sizeInKB);
     rms.incCounters(SOURCE_SHIPPED_KBS, sizeInKB);
   }
-  
+
   /** increase the byte number read by source from log file */
   public void incrLogReadInBytes(long readInBytes) {
     rms.incCounters(logReadInBytesKey, readInBytes);

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 839db9b..2104268 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -29,8 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -61,7 +60,7 @@ public class ReplicationSinkManager {
 
   private final String peerClusterId;
 
-  private final ReplicationPeers replicationPeers;
+  private final HBaseReplicationEndpoint endpoint;
 
   // Count of "bad replication sink" reports per peer sink
   private final Map<ServerName, Integer> badReportCounts;
@@ -85,15 +84,15 @@ public class ReplicationSinkManager {
    * Instantiate for a single replication peer cluster.
    * @param conn connection to the peer cluster
    * @param peerClusterId identifier of the peer cluster
-   * @param replicationPeers manages peer clusters being replicated to
+   * @param endpoint replication endpoint for inter cluster replication
    * @param conf HBase configuration, used for determining replication source ratio and bad peer
    *          threshold
    */
   public ReplicationSinkManager(HConnection conn, String peerClusterId,
-      ReplicationPeers replicationPeers, Configuration conf) {
+      HBaseReplicationEndpoint endpoint, Configuration conf) {
     this.conn = conn;
     this.peerClusterId = peerClusterId;
-    this.replicationPeers = replicationPeers;
+    this.endpoint = endpoint;
     this.badReportCounts = Maps.newHashMap();
     this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
     this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
@@ -107,8 +106,7 @@ public class ReplicationSinkManager {
    * @return a replication sink to replicate to
    */
   public SinkPeer getReplicationSink() throws IOException {
-    if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId)
-                                                        > this.lastUpdateToPeers) {
+    if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers) {
       LOG.info("Current list of sinks is out of date, updating");
       chooseSinks();
     }
@@ -143,8 +141,7 @@ public class ReplicationSinkManager {
   }
 
   void chooseSinks() {
-    List<ServerName> slaveAddresses =
-                        replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
+    List<ServerName> slaveAddresses = endpoint.getRegionServers();
     Collections.shuffle(slaveAddresses, random);
     int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
     sinks = slaveAddresses.subList(0, numSinks);

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4e2106d..87cbcc6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -21,13 +21,9 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
 import java.util.UUID;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -41,27 +37,25 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.ipc.RemoteException;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
 
 /**
  * Class that handles the source of a replication stream.
@@ -82,9 +76,9 @@ public class ReplicationSource extends Thread
   public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
   // Queue of logs to process
   private PriorityBlockingQueue<Path> queue;
-  private HConnection conn;
   private ReplicationQueues replicationQueues;
   private ReplicationPeers replicationPeers;
+
   private Configuration conf;
   private ReplicationQueueInfo replicationQueueInfo;
   // id of the peer cluster this source replicates to
@@ -118,8 +112,6 @@ public class ReplicationSource extends Thread
   private String peerClusterZnode;
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
-  // Socket timeouts require even bolder actions since we don't want to DDOS
-  private int socketTimeoutMultiplier;
   // Current number of operations (Put/Delete) that we need to replicate
   private int currentNbOperations = 0;
   // Current size of data we need to replicate
@@ -130,10 +122,14 @@ public class ReplicationSource extends Thread
   private MetricsSource metrics;
   // Handle on the log reader helper
   private ReplicationHLogReaderManager repLogReader;
-  // Handles connecting to peer region servers
-  private ReplicationSinkManager replicationSinkMgr;
   //WARN threshold for the number of queued logs, defaults to 2
   private int logQueueWarnThreshold;
+  // ReplicationEndpoint which will handle the actual replication
+  private ReplicationEndpoint replicationEndpoint;
+  // A filter (or a chain of filters) for the WAL entries.
+  private WALEntryFilter walEntryFilter;
+  // Context for ReplicationEndpoint#replicate()
+  private ReplicationEndpoint.ReplicateContext replicateContext;
   // throttler
   private ReplicationThrottler throttler;
 
@@ -145,30 +141,30 @@ public class ReplicationSource extends Thread
    * @param manager replication manager to ping to
    * @param stopper     the atomic boolean to use to stop the regionserver
    * @param peerClusterZnode the name of our znode
+   * @param clusterId unique UUID for the cluster
+   * @param replicationEndpoint the replication endpoint implementation
+   * @param metrics metrics for replication source
    * @throws IOException
    */
+  @Override
   public void init(final Configuration conf, final FileSystem fs,
       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final Stoppable stopper,
-      final String peerClusterZnode, final UUID clusterId) throws IOException {
+      final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
+      final MetricsSource metrics)
+          throws IOException {
     this.stopper = stopper;
-    this.conf = HBaseConfiguration.create(conf);
+    this.conf = conf;
     decorateConf();
     this.replicationQueueSizeCapacity =
         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
     this.replicationQueueNbCapacity =
         this.conf.getInt("replication.source.nb.capacity", 25000);
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
-    this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
-        maxRetriesMultiplier * maxRetriesMultiplier);
     this.queue =
         new PriorityBlockingQueue<Path>(
             this.conf.getInt("hbase.regionserver.maxlogs", 32),
             new LogsComparator());
-    // TODO: This connection is replication specific or we should make it particular to
-    // replication and make replication specific settings such as compression or codec to use
-    // passing Cells.
-    this.conn = HConnectionManager.getConnection(this.conf);
     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
     this.replicationQueues = replicationQueues;
@@ -177,7 +173,7 @@ public class ReplicationSource extends Thread
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.fs = fs;
-    this.metrics = new MetricsSource(peerClusterZnode);
+    this.metrics = metrics;
     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
     this.clusterId = clusterId;
 
@@ -185,8 +181,10 @@ public class ReplicationSource extends Thread
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
-    this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
+    this.replicationEndpoint = replicationEndpoint;
+
+    this.replicateContext = new ReplicationEndpoint.ReplicateContext();
   }
 
   private void decorateConf() {
@@ -209,30 +207,48 @@ public class ReplicationSource extends Thread
   }
 
   private void uninitialize() {
-    if (this.conn != null) {
-      try {
-        this.conn.close();
-      } catch (IOException e) {
-        LOG.debug("Attempt to close connection failed", e);
-      }
-    }
     LOG.debug("Source exiting " + this.peerId);
     metrics.clear();
+    if (replicationEndpoint.state() == Service.State.STARTING
+        || replicationEndpoint.state() == Service.State.RUNNING) {
+      replicationEndpoint.stopAndWait();
+    }
   }
 
   @Override
   public void run() {
-    connectToPeers();
     // We were stopped while looping to connect to sinks, just abort
     if (!this.isActive()) {
       uninitialize();
       return;
     }
 
+    try {
+      // start the endpoint, connect to the cluster
+      Service.State state = replicationEndpoint.start().get();
+      if (state != Service.State.RUNNING) {
+        LOG.warn("ReplicationEndpoint was not started. Exiting");
+        uninitialize();
+        return;
+      }
+    } catch (Exception ex) {
+      LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
+      throw new RuntimeException(ex);
+    }
+
+    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
+    ArrayList<WALEntryFilter> filters = Lists.newArrayList(
+      (WALEntryFilter)new SystemTableWALEntryFilter());
+    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
+    if (filterFromEndpoint != null) {
+      filters.add(filterFromEndpoint);
+    }
+    this.walEntryFilter = new ChainWALEntryFilter(filters);
+
     int sleepMultiplier = 1;
     // delay this until we are in an asynchronous thread
     while (this.isActive() && this.peerClusterId == null) {
-      this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
+      this.peerClusterId = replicationEndpoint.getPeerUUID();
       if (this.isActive() && this.peerClusterId == null) {
         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
           sleepMultiplier++;
@@ -250,9 +266,10 @@ public class ReplicationSource extends Thread
 
     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
     // peerClusterId value, which is the same as the source clusterId
-    if (clusterId.equals(peerClusterId)) {
+    if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
-          + peerClusterId);
+          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
+          + replicationEndpoint.getClass().getName(), null, false);
     }
     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
 
@@ -397,8 +414,8 @@ public class ReplicationSource extends Thread
    * entries
    * @throws IOException
    */
-  protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries)
-      throws IOException{
+  protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
+      List<HLog.Entry> entries) throws IOException{
     long seenEntries = 0;
     if (LOG.isTraceEnabled()) {
       LOG.trace("Seeking in " + this.currentPath + " at position "
@@ -409,18 +426,22 @@ public class ReplicationSource extends Thread
     HLog.Entry entry =
         this.repLogReader.readNextAndSetPosition();
     while (entry != null) {
-      WALEdit edit = entry.getEdit();
       this.metrics.incrLogEditsRead();
       seenEntries++;
-      // Remove all KVs that should not be replicated
-      HLogKey logKey = entry.getKey();
+
       // don't replicate if the log entries have already been consumed by the cluster
-      if (!logKey.getClusterIds().contains(peerClusterId)) {
-        removeNonReplicableEdits(entry);
-        // Don't replicate catalog entries, if the WALEdit wasn't
-        // containing anything to replicate and if we're currently not set to replicate
-        if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) &&
-            edit.size() != 0) {
+      if (replicationEndpoint.canReplicateToSameCluster()
+          || !entry.getKey().getClusterIds().contains(peerClusterId)) {
+        // Remove all KVs that should not be replicated
+        entry = walEntryFilter.filter(entry);
+        WALEdit edit = null;
+        HLogKey logKey = null;
+        if (entry != null) {
+          edit = entry.getEdit();
+          logKey = entry.getKey();
+        }
+
+        if (edit != null && edit.size() != 0) {
           //Mark that the current cluster has the change
           logKey.addClusterId(clusterId);
           currentNbOperations += countDistinctRowKeys(edit);
@@ -451,20 +472,6 @@ public class ReplicationSource extends Thread
     return seenEntries == 0 && processEndOfFile();
   }
 
-  private void connectToPeers() {
-    int sleepMultiplier = 1;
-
-    // Connect to peer cluster first, unless we have to stop
-    while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
-      replicationSinkMgr.chooseSinks();
-      if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
-        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-      }
-    }
-  }
-
   /**
    * Poll for the next path
    * @return true if a path was obtained, false if not
@@ -594,8 +601,8 @@ public class ReplicationSource extends Thread
   /*
    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
-   * trying to read the log file and get EOFEception. In case of a recovered queue the last log file
-   * may be empty, and we don't want to retry that.
+   * trying to read the log file and get EOFException. In case of a recovered queue the last log
+   * file may be empty, and we don't want to retry that.
    */
   private boolean isCurrentLogEmpty() {
     return (this.repLogReader.getPosition() == 0 &&
@@ -622,47 +629,6 @@ public class ReplicationSource extends Thread
   }
 
   /**
-   * We only want KVs that are scoped other than local
-   * @param entry The entry to check for replication
-   */
-  protected void removeNonReplicableEdits(HLog.Entry entry) {
-    String tabName = entry.getKey().getTablename().getNameAsString();
-    ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
-    Map<String, List<String>> tableCFs = null;
-    try {
-      tableCFs = this.replicationPeers.getTableCFs(peerId);
-    } catch (IllegalArgumentException e) {
-      LOG.error("should not happen: can't get tableCFs for peer " + peerId +
-          ", degenerate as if it's not configured by keeping tableCFs==null");
-    }
-    int size = kvs.size();
-
-    // clear kvs(prevent replicating) if logKey's table isn't in this peer's
-    // replicable table list (empty tableCFs means all table are replicable)
-    if (tableCFs != null && !tableCFs.containsKey(tabName)) {
-      kvs.clear();
-    } else {
-      NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
-      List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
-      for (int i = size - 1; i >= 0; i--) {
-        KeyValue kv = kvs.get(i);
-        // The scope will be null or empty if
-        // there's nothing to replicate in that WALEdit
-        // ignore(remove) kv if its cf isn't in the replicable cf list
-        // (empty cfs means all cfs of this table are replicable)
-        if (scopes == null || !scopes.containsKey(kv.getFamily()) ||
-            (cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
-          kvs.remove(i);
-        }
-      }
-    }
-
-    if (kvs.size() < size/2) {
-      kvs.trimToSize();
-    }
-  }
-
-  /**
    * Count the number of different row keys in the given edit because of
    * mini-batching. We assume that there's at least one KV in the WALEdit.
    * @param edit edit to count row keys from
@@ -692,13 +658,6 @@ public class ReplicationSource extends Thread
       return;
     }
     while (this.isActive()) {
-      if (!isPeerEnabled()) {
-        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-      SinkPeer sinkPeer = null;
       try {
         if (this.throttler.isEnabled()) {
           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
@@ -719,14 +678,15 @@ public class ReplicationSource extends Thread
             this.throttler.resetStartTick();
           }
         }
-        sinkPeer = replicationSinkMgr.getReplicationSink();
-        BlockingInterface rrs = sinkPeer.getRegionServer();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Replicating " + entries.size() +
-              " entries of total size " + currentSize);
+        replicateContext.setEntries(entries).setSize(currentSize);
+
+        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
+        boolean replicated = replicationEndpoint.replicate(replicateContext);
+
+        if (!replicated) {
+          continue;
         }
-        ReplicationProtbufUtil.replicateWALEntry(rrs,
-            entries.toArray(new HLog.Entry[entries.size()]));
+
         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
               this.peerClusterZnode, this.repLogReader.getPosition(),
@@ -745,50 +705,9 @@ public class ReplicationSource extends Thread
               + this.totalReplicatedOperations + " operations");
         }
         break;
-
-      } catch (IOException ioe) {
-        // Didn't ship anything, but must still age the last time we did
-        this.metrics.refreshAgeOfLastShippedOp();
-        if (ioe instanceof RemoteException) {
-          ioe = ((RemoteException) ioe).unwrapRemoteException();
-          LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
-          if (ioe instanceof TableNotFoundException) {
-            if (sleepForRetries("A table is missing in the peer cluster. "
-                + "Replication cannot proceed without losing data.", sleepMultiplier)) {
-              sleepMultiplier++;
-            }
-            // current thread might be interrupted to terminate
-            // directly go back to while() for confirm this
-            if (isInterrupted()) {
-              continue;
-            }
-          }
-        } else {
-          if (ioe instanceof SocketTimeoutException) {
-            // This exception means we waited for more than 60s and nothing
-            // happened, the cluster is alive and calling it right away
-            // even for a test just makes things worse.
-            sleepForRetries("Encountered a SocketTimeoutException. Since the " +
-              "call to the remote cluster timed out, which is usually " +
-              "caused by a machine failure or a massive slowdown",
-              this.socketTimeoutMultiplier);
-            // current thread might be interrupted to terminate
-            // directly go back to while() for confirm this
-            if (isInterrupted()) {
-              continue;
-            }
-          } else if (ioe instanceof ConnectException) {
-            LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
-            replicationSinkMgr.chooseSinks();
-          } else {
-            LOG.warn("Can't replicate because of a local or network error: ", ioe);
-          }
-        }
-
-        if (sinkPeer != null) {
-          replicationSinkMgr.reportBadSink(sinkPeer);
-        }
-        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
+      } catch (Exception ex) {
+        LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex);
+        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
           sleepMultiplier++;
         }
       }
@@ -801,7 +720,7 @@ public class ReplicationSource extends Thread
    * @return true if the peer is enabled, otherwise false
    */
   protected boolean isPeerEnabled() {
-    return this.replicationPeers.getStatusOfConnectedPeer(this.peerId);
+    return this.replicationPeers.getStatusOfPeer(this.peerId);
   }
 
   /**
@@ -835,10 +754,12 @@ public class ReplicationSource extends Thread
     return false;
   }
 
+  @Override
   public void startup() {
     String n = Thread.currentThread().getName();
     Thread.UncaughtExceptionHandler handler =
         new Thread.UncaughtExceptionHandler() {
+          @Override
           public void uncaughtException(final Thread t, final Throwable e) {
             LOG.error("Unexpected exception in ReplicationSource," +
               " currentPath=" + currentPath, e);
@@ -849,11 +770,17 @@ public class ReplicationSource extends Thread
         this.peerClusterZnode, handler);
   }
 
+  @Override
   public void terminate(String reason) {
     terminate(reason, null);
   }
 
+  @Override
   public void terminate(String reason, Exception cause) {
+    terminate(reason, cause, true);
+  }
+
+  public void terminate(String reason, Exception cause, boolean join) {
     if (cause == null) {
       LOG.info("Closing source "
           + this.peerClusterZnode + " because: " + reason);
@@ -864,17 +791,33 @@ public class ReplicationSource extends Thread
     }
     this.running = false;
     this.interrupt();
-    Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier);
+    ListenableFuture<Service.State> future = null;
+    if (this.replicationEndpoint != null) {
+      future = this.replicationEndpoint.stop();
+    }
+    if (join) {
+      Threads.shutdown(this, this.sleepForRetries);
+      if (future != null) {
+        try {
+          future.get();
+        } catch (Exception e) {
+          LOG.warn("Got exception:" + e);
+        }
+      }
+    }
   }
 
+  @Override
   public String getPeerClusterZnode() {
     return this.peerClusterZnode;
   }
 
+  @Override
   public String getPeerClusterId() {
     return this.peerId;
   }
 
+  @Override
   public Path getCurrentPath() {
     return this.currentPath;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index df599f0..6388d9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 
@@ -50,7 +51,8 @@ public interface ReplicationSourceInterface {
   public void init(final Configuration conf, final FileSystem fs,
       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final Stoppable stopper,
-      final String peerClusterZnode, final UUID clusterId) throws IOException;
+      final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
+      final MetricsSource metrics) throws IOException;
 
   /**
    * Add a log to the list of logs to replicate

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 7b4cd83..db9c505 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -43,9 +43,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 
@@ -115,7 +119,7 @@ public class ReplicationSourceManager implements ReplicationListener {
       final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
       final Path oldLogDir, final UUID clusterId) {
     //CopyOnWriteArrayList is thread-safe.
-    //Generally, reading is more than modifying. 
+    //Generally, reading is more than modifying.
     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
     this.replicationQueues = replicationQueues;
     this.replicationPeers = replicationPeers;
@@ -194,7 +198,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * old region server hlog queues
    */
   protected void init() throws IOException, ReplicationException {
-    for (String id : this.replicationPeers.getConnectedPeers()) {
+    for (String id : this.replicationPeers.getPeerIds()) {
       addSource(id);
     }
     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
@@ -221,9 +225,12 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   protected ReplicationSourceInterface addSource(String id) throws IOException,
       ReplicationException {
+    ReplicationPeerConfig peerConfig
+      = replicationPeers.getReplicationPeerConfig(id);
+    ReplicationPeer peer = replicationPeers.getPeer(id);
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
-          this.replicationPeers, stopper, id, this.clusterId);
+          this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer);
     synchronized (this.hlogsById) {
       this.sources.add(src);
       this.hlogsById.put(id, new TreeSet<String>());
@@ -254,7 +261,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void deleteSource(String peerId, boolean closeConnection) {
     this.replicationQueues.removeQueue(peerId);
     if (closeConnection) {
-      this.replicationPeers.disconnectFromPeer(peerId);
+      this.replicationPeers.peerRemoved(peerId);
     }
   }
 
@@ -340,7 +347,9 @@ public class ReplicationSourceManager implements ReplicationListener {
   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
       final FileSystem fs, final ReplicationSourceManager manager,
       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
-      final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
+      final Stoppable stopper, final String peerId, final UUID clusterId,
+      final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
+          throws IOException {
     ReplicationSourceInterface src;
     try {
       @SuppressWarnings("rawtypes")
@@ -351,9 +360,32 @@ public class ReplicationSourceManager implements ReplicationListener {
       LOG.warn("Passed replication source implementation throws errors, " +
           "defaulting to ReplicationSource", e);
       src = new ReplicationSource();
+    }
 
+    ReplicationEndpoint replicationEndpoint = null;
+    try {
+      String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
+      if (replicationEndpointImpl == null) {
+        // Default to HBase inter-cluster replication endpoint
+        replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
+      }
+      @SuppressWarnings("rawtypes")
+      Class c = Class.forName(replicationEndpointImpl);
+      replicationEndpoint = (ReplicationEndpoint) c.newInstance();
+    } catch (Exception e) {
+      LOG.warn("Passed replication endpoint implementation throws errors", e);
+      throw new IOException(e);
     }
-    src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
+
+    MetricsSource metrics = new MetricsSource(peerId);
+    // init replication source
+    src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId,
+      clusterId, replicationEndpoint, metrics);
+
+    // init replication endpoint
+    replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
+      fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
+
     return src;
   }
 
@@ -441,7 +473,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void peerListChanged(List<String> peerIds) {
     for (String id : peerIds) {
       try {
-        boolean added = this.replicationPeers.connectToPeer(id);
+        boolean added = this.replicationPeers.peerAdded(id);
         if (added) {
           addSource(id);
         }
@@ -507,10 +539,26 @@ public class ReplicationSourceManager implements ReplicationListener {
       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
         String peerId = entry.getKey();
         try {
+          // there is not an actual peer defined corresponding to peerId for the failover.
+          ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+          String actualPeerId = replicationQueueInfo.getPeerId();
+          ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
+          ReplicationPeerConfig peerConfig = null;
+          try {
+            peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
+          } catch (ReplicationException ex) {
+            LOG.warn("Received exception while getting replication peer config, skipping replay"
+                + ex);
+          }
+          if (peer == null || peerConfig == null) {
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
+            continue;
+          }
+
           ReplicationSourceInterface src =
               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
-                stopper, peerId, this.clusterId);
-          if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
+                stopper, peerId, this.clusterId, peerConfig, peer);
+          if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
             src.terminate("Recovered queue doesn't belong to any current peer");
             break;
           }
@@ -561,7 +609,7 @@ public class ReplicationSourceManager implements ReplicationListener {
       stats.append(source.getStats() + "\n");
     }
     for (ReplicationSourceInterface oldSource : oldsources) {
-      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
+      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
       stats.append(oldSource.getStats()+ "\n");
     }
     return stats.toString();

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index fa744ff..77bc64e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase.client.replication;
 
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -25,6 +29,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Lists;
+
 import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -117,5 +123,36 @@ public class TestReplicationAdmin {
     admin.removePeer(ID_ONE);
   }
 
+  @Test
+  public void testGetTableCfsStr() {
+    // opposite of TestPerTableCFReplication#testParseTableCFsFromConfig()
+
+    Map<TableName, List<String>> tabCFsMap = null;
+
+    // 1. null or empty string, result should be null
+    assertEquals(null, ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+
+    // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
+    tabCFsMap = new TreeMap<TableName, List<String>>();
+    tabCFsMap.put(TableName.valueOf("tab1"), null);   // its table name is "tab1"
+    assertEquals("tab1", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+    tabCFsMap = new TreeMap<TableName, List<String>>();
+    tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1"));
+    assertEquals("tab1:cf1", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+    tabCFsMap = new TreeMap<TableName, List<String>>();
+    tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1", "cf3"));
+    assertEquals("tab1:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+    // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
+    tabCFsMap = new TreeMap<TableName, List<String>>();
+    tabCFsMap.put(TableName.valueOf("tab1"), null);
+    tabCFsMap.put(TableName.valueOf("tab2"), Lists.newArrayList("cf1"));
+    tabCFsMap.put(TableName.valueOf("tab3"), Lists.newArrayList("cf1", "cf3"));
+    assertEquals("tab1;tab2:cf1;tab3:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index fa3dda6..13a18ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 
@@ -40,7 +41,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
       ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
-      UUID clusterId) throws IOException {
+      UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
+          throws IOException {
 
     this.manager = manager;
     this.peerClusterId = peerClusterId;

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index ee102fc..ff77a94 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -175,30 +175,30 @@ public class TestPerTableCFReplication {
     Map<String, List<String>> tabCFsMap = null;
 
     // 1. null or empty string, result should be null
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(null);
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(null);
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("");
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("   ");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("   ");
     assertEquals(null, tabCFsMap);
 
     // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey("tab1"));   // its table name is "tab1"
     assertFalse(tabCFsMap.containsKey("tab2"));  // not other table
     assertEquals(null, tabCFsMap.get("tab1"));   // null cf-list,
 
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab2:cf1");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab2:cf1");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey("tab2"));   // its table name is "tab2"
     assertFalse(tabCFsMap.containsKey("tab1"));  // not other table
     assertEquals(1, tabCFsMap.get("tab2").size());   // cf-list contains only 1 cf
     assertEquals("cf1", tabCFsMap.get("tab2").get(0));// the only cf is "cf1"
 
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab3 : cf1 , cf3");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab3 : cf1 , cf3");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey("tab3"));   // its table name is "tab2"
     assertFalse(tabCFsMap.containsKey("tab1"));  // not other table
@@ -207,7 +207,7 @@ public class TestPerTableCFReplication {
     assertTrue(tabCFsMap.get("tab3").contains("cf3"));// contains "cf3"
 
     // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
     // 3.1 contains 3 tables : "tab1", "tab2" and "tab3"
     assertEquals(3, tabCFsMap.size());
     assertTrue(tabCFsMap.containsKey("tab1"));
@@ -225,7 +225,8 @@ public class TestPerTableCFReplication {
 
     // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
     // still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
+      "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
     // 4.1 contains 3 tables : "tab1", "tab2" and "tab3"
     assertEquals(3, tabCFsMap.size());
     assertTrue(tabCFsMap.containsKey("tab1"));
@@ -243,7 +244,8 @@ public class TestPerTableCFReplication {
 
     // 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"
     //    "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
+      "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
     // 5.1 no "tab1" and "tab2", only "tab3"
     assertEquals(1, tabCFsMap.size()); // only one table
     assertFalse(tabCFsMap.containsKey("tab1"));

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
new file mode 100644
index 0000000..38aaa7a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests ReplicationSource and ReplicationEndpoint interactions
+ */
+@Category(MediumTests.class)
+public class TestReplicationEndpoint extends TestReplicationBase {
+
+  static int numRegionServers;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestReplicationBase.setUpBeforeClass();
+    utility2.shutdownMiniCluster(); // we don't need the second cluster
+    admin.removePeer("2");
+    numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TestReplicationBase.tearDownAfterClass();
+    // check stop is called
+    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
+  }
+
+  @Before
+  public void setup() throws FailedLogCloseException, IOException {
+    ReplicationEndpointForTest.contructedCount.set(0);
+    ReplicationEndpointForTest.startedCount.set(0);
+    ReplicationEndpointForTest.replicateCount.set(0);
+    ReplicationEndpointForTest.lastEntries = null;
+    for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
+      utility1.getHBaseAdmin().rollHLogWriter(rs.getRegionServer().getServerName().toString());
+    }
+  }
+
+  @Test
+  public void testCustomReplicationEndpoint() throws Exception {
+    // test installing a custom replication endpoint other than the default one.
+    admin.addPeer("testCustomReplicationEndpoint",
+      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+
+    // check whether the class has been constructed and started
+    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
+      }
+    });
+
+    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
+      }
+    });
+
+    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
+
+    // now replicate some data.
+    doPut(Bytes.toBytes("row42"));
+
+    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return ReplicationEndpointForTest.replicateCount.get() >= 1;
+      }
+    });
+
+    doAssert(Bytes.toBytes("row42"));
+
+    admin.removePeer("testCustomReplicationEndpoint");
+  }
+
+  @Test
+  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
+    admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate",
+      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
+    // now replicate some data.
+    doPut(row);
+
+    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return ReplicationEndpointReturningFalse.replicated.get();
+      }
+    });
+    if (ReplicationEndpointReturningFalse.ex.get() != null) {
+      throw ReplicationEndpointReturningFalse.ex.get();
+    }
+
+    admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
+  }
+
+  @Test
+  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
+    admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
+      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
+    // now replicate some data.
+    doPut(Bytes.toBytes("row1"));
+    doPut(row);
+    doPut(Bytes.toBytes("row2"));
+
+    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return ReplicationEndpointForTest.replicateCount.get() >= 1;
+      }
+    });
+
+    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
+    admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
+  }
+
+
+  private void doPut(byte[] row) throws IOException {
+    Put put = new Put(row);
+    put.add(famName, row, row);
+    htable1 = new HTable(conf1, tableName);
+    htable1.put(put);
+    htable1.close();
+  }
+
+  private static void doAssert(byte[] row) throws Exception {
+    if (ReplicationEndpointForTest.lastEntries == null) {
+      return; // first call
+    }
+    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
+    List<KeyValue> kvs = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getKeyValues();
+    Assert.assertEquals(1, kvs.size());
+    Assert.assertTrue(Bytes.equals(kvs.get(0).getRowArray(), kvs.get(0).getRowOffset(),
+      kvs.get(0).getRowLength(), row, 0, row.length));
+  }
+
+  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
+    static UUID uuid = UUID.randomUUID();
+    static AtomicInteger contructedCount = new AtomicInteger();
+    static AtomicInteger startedCount = new AtomicInteger();
+    static AtomicInteger stoppedCount = new AtomicInteger();
+    static AtomicInteger replicateCount = new AtomicInteger();
+    static volatile List<HLog.Entry> lastEntries = null;
+
+    public ReplicationEndpointForTest() {
+      contructedCount.incrementAndGet();
+    }
+
+    @Override
+    public UUID getPeerUUID() {
+      return uuid;
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      replicateCount.incrementAndGet();
+      lastEntries = replicateContext.entries;
+      return true;
+    }
+
+    @Override
+    protected void doStart() {
+      startedCount.incrementAndGet();
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      stoppedCount.incrementAndGet();
+      notifyStopped();
+    }
+  }
+
+  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
+    static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
+    static AtomicBoolean replicated = new AtomicBoolean(false);
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      try {
+        // check row
+        doAssert(row);
+      } catch (Exception e) {
+        ex.set(e);
+      }
+
+      super.replicate(replicateContext);
+
+      replicated.set(replicateCount.get() > 10); // first 10 times, we return false
+      return replicated.get();
+    }
+  }
+
+  // return a WALEntry filter which only accepts "row", but not other rows
+  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
+    static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      try {
+        super.replicate(replicateContext);
+        doAssert(row);
+      } catch (Exception e) {
+        ex.set(e);
+      }
+      return true;
+    }
+
+    @Override
+    public WALEntryFilter getWALEntryfilter() {
+      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
+        @Override
+        public Entry filter(Entry entry) {
+          ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
+          int size = kvs.size();
+          for (int i = size-1; i >= 0; i--) {
+            KeyValue kv = kvs.get(i);
+            if (!Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+              row, 0, row.length)) {
+              kvs.remove(i);
+            }
+          }
+          return entry;
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index fd003ad..e560620 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -179,50 +179,48 @@ public abstract class TestReplicationStateBasic {
     } catch (IllegalArgumentException e) {
     }
     try {
-      rp.getStatusOfConnectedPeer("bogus");
+      rp.getStatusOfPeer("bogus");
       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
     } catch (IllegalArgumentException e) {
     }
-    assertFalse(rp.connectToPeer("bogus"));
-    rp.disconnectFromPeer("bogus");
-    assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
-    assertNull(rp.getPeerUUID("bogus"));
+    assertFalse(rp.peerAdded("bogus"));
+    rp.peerRemoved("bogus");
+
     assertNull(rp.getPeerConf("bogus"));
-    assertNumberOfPeers(0, 0);
+    assertNumberOfPeers(0);
 
     // Add some peers
-    rp.addPeer(ID_ONE, KEY_ONE);
-    assertNumberOfPeers(0, 1);
-    rp.addPeer(ID_TWO, KEY_TWO);
-    assertNumberOfPeers(0, 2);
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    assertNumberOfPeers(1);
+    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
+    assertNumberOfPeers(2);
 
     // Test methods with a peer that is added but not connected
     try {
-      rp.getStatusOfConnectedPeer(ID_ONE);
+      rp.getStatusOfPeer(ID_ONE);
       fail("There are no connected peers, should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException e) {
     }
-    assertNull(rp.getPeerUUID(ID_ONE));
-    assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
-    rp.disconnectFromPeer(ID_ONE);
-    assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
-
-    // Connect to one peer
-    rp.connectToPeer(ID_ONE);
-    assertNumberOfPeers(1, 2);
-    assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
+    assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
+    rp.removePeer(ID_ONE);
+    rp.peerRemoved(ID_ONE);
+    assertNumberOfPeers(1);
+
+    // Add one peer
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    rp.peerAdded(ID_ONE);
+    assertNumberOfPeers(2);
+    assertTrue(rp.getStatusOfPeer(ID_ONE));
     rp.disablePeer(ID_ONE);
     assertConnectedPeerStatus(false, ID_ONE);
     rp.enablePeer(ID_ONE);
     assertConnectedPeerStatus(true, ID_ONE);
-    assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
-    assertNotNull(rp.getPeerUUID(ID_ONE).toString());
 
     // Disconnect peer
-    rp.disconnectFromPeer(ID_ONE);
-    assertNumberOfPeers(0, 2);
+    rp.peerRemoved(ID_ONE);
+    assertNumberOfPeers(2);
     try {
-      rp.getStatusOfConnectedPeer(ID_ONE);
+      rp.getStatusOfPeer(ID_ONE);
       fail("There are no connected peers, should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException e) {
     }
@@ -234,7 +232,7 @@ public abstract class TestReplicationStateBasic {
       fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
     }
     while (true) {
-      if (status == rp.getStatusOfConnectedPeer(peerId)) {
+      if (status == rp.getStatusOfPeer(peerId)) {
         return;
       }
       if (zkTimeoutCount < ZK_MAX_COUNT) {
@@ -247,9 +245,9 @@ public abstract class TestReplicationStateBasic {
     }
   }
 
-  protected void assertNumberOfPeers(int connected, int total) {
-    assertEquals(total, rp.getAllPeerClusterKeys().size());
-    assertEquals(connected, rp.getConnectedPeers().size());
+  protected void assertNumberOfPeers(int total) {
+    assertEquals(total, rp.getAllPeerConfigs().size());
+    assertEquals(total, rp.getAllPeerIds().size());
     assertEquals(total, rp.getAllPeerIds().size());
   }
 
@@ -269,7 +267,7 @@ public abstract class TestReplicationStateBasic {
         rq3.addLog("qId" + i, "filename" + j);
       }
       //Add peers for the corresponding queues so they are not orphans
-      rp.addPeer("qId" + i, "bogus" + i);
+      rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index 5273fe3..1c3de71 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -145,7 +145,7 @@ public class TestReplicationTrackerZKImpl {
 
   @Ignore ("Flakey") @Test(timeout = 30000)
   public void testPeerRemovedEvent() throws Exception {
-    rp.addPeer("5", utility.getClusterKey());
+    rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
     rt.registerListener(new DummyReplicationListener());
     rp.removePeer("5");
     // wait for event
@@ -158,7 +158,7 @@ public class TestReplicationTrackerZKImpl {
   @Ignore ("Flakey") @Test(timeout = 30000)
   public void testPeerListChangedEvent() throws Exception {
     // add a peer
-    rp.addPeer("5", utility.getClusterKey());
+    rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
     zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
     rt.registerListener(new DummyReplicationListener());
     rp.disablePeer("5");

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
new file mode 100644
index 0000000..d4b412e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+@Category(SmallTests.class)
+public class TestReplicationWALEntryFilters {
+
+  static byte[] a = new byte[] {'a'};
+  static byte[] b = new byte[] {'b'};
+  static byte[] c = new byte[] {'c'};
+  static byte[] d = new byte[] {'d'};
+
+  @Test
+  public void testSystemTableWALEntryFilter() {
+    SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
+
+    // meta
+    HLogKey key1 = new HLogKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
+      HTableDescriptor.META_TABLEDESC.getTableName());
+    HLog.Entry metaEntry = new Entry(key1, null);
+
+    assertNull(filter.filter(metaEntry));
+
+    // ns table
+    HLogKey key2 = new HLogKey(new byte[] {}, HTableDescriptor.NAMESPACE_TABLEDESC.getTableName());
+    HLog.Entry nsEntry = new Entry(key2, null);
+    assertNull(filter.filter(nsEntry));
+
+    // user table
+
+    HLogKey key3 = new HLogKey(new byte[] {}, TableName.valueOf("foo"));
+    HLog.Entry userEntry = new Entry(key3, null);
+
+    assertEquals(userEntry, filter.filter(userEntry));
+  }
+
+  @Test
+  public void testScopeWALEntryFilter() {
+    ScopeWALEntryFilter filter = new ScopeWALEntryFilter();
+
+    HLog.Entry userEntry = createEntry(a, b);
+    HLog.Entry userEntryA = createEntry(a);
+    HLog.Entry userEntryB = createEntry(b);
+    HLog.Entry userEntryEmpty = createEntry();
+
+    // no scopes
+    assertEquals(null, filter.filter(userEntry));
+
+    // empty scopes
+    TreeMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    assertEquals(null, filter.filter(userEntry));
+
+    // different scope
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    // all kvs should be filtered
+    assertEquals(userEntryEmpty, filter.filter(userEntry));
+
+    // local scope
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    assertEquals(userEntryEmpty, filter.filter(userEntry));
+    scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
+    assertEquals(userEntryEmpty, filter.filter(userEntry));
+
+    // only scope a
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    assertEquals(userEntryA, filter.filter(userEntry));
+    scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
+    assertEquals(userEntryA, filter.filter(userEntry));
+
+    // only scope b
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    assertEquals(userEntryB, filter.filter(userEntry));
+    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
+    assertEquals(userEntryB, filter.filter(userEntry));
+
+    // scope a and b
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    assertEquals(userEntryB, filter.filter(userEntry));
+    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
+    assertEquals(userEntryB, filter.filter(userEntry));
+  }
+
+  WALEntryFilter nullFilter = new WALEntryFilter() {
+    @Override
+    public Entry filter(Entry entry) {
+      return null;
+    }
+  };
+
+  WALEntryFilter passFilter = new WALEntryFilter() {
+    @Override
+    public Entry filter(Entry entry) {
+      return entry;
+    }
+  };
+
+  @Test
+  public void testChainWALEntryFilter() {
+    HLog.Entry userEntry = createEntry(a, b, c);
+
+    ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter);
+    assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(passFilter, passFilter);
+    assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter);
+    assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(nullFilter);
+    assertEquals(null, filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(nullFilter, passFilter);
+    assertEquals(null, filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(passFilter, nullFilter);
+    assertEquals(null, filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter);
+    assertEquals(null, filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(nullFilter, nullFilter);
+    assertEquals(null, filter.filter(userEntry));
+
+    // flatten
+    filter =
+        new ChainWALEntryFilter(
+          new ChainWALEntryFilter(passFilter,
+            new ChainWALEntryFilter(passFilter, passFilter),
+          new ChainWALEntryFilter(passFilter),
+          new ChainWALEntryFilter(passFilter)),
+          new ChainWALEntryFilter(passFilter));
+    assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+
+    filter =
+        new ChainWALEntryFilter(
+          new ChainWALEntryFilter(passFilter,
+            new ChainWALEntryFilter(passFilter,
+              new ChainWALEntryFilter(nullFilter))),
+          new ChainWALEntryFilter(passFilter));
+    assertEquals(null, filter.filter(userEntry));
+  }
+
+  @Test
+  public void testTableCfWALEntryFilter() {
+    ReplicationPeer peer = mock(ReplicationPeer.class);
+
+    when(peer.getTableCFs()).thenReturn(null);
+    HLog.Entry userEntry = createEntry(a, b, c);
+    TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer);
+    assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+    // empty map
+    userEntry = createEntry(a, b, c);
+    Map<String, List<String>> tableCfs = new HashMap<String, List<String>>();
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    filter = new TableCfWALEntryFilter(peer);
+    assertEquals(null, filter.filter(userEntry));
+
+    // table bar
+    userEntry = createEntry(a, b, c);
+    tableCfs = new HashMap<String, List<String>>();
+    tableCfs.put("bar", null);
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    filter = new TableCfWALEntryFilter(peer);
+    assertEquals(null, filter.filter(userEntry));
+
+    // table foo:a
+    userEntry = createEntry(a, b, c);
+    tableCfs = new HashMap<String, List<String>>();
+    tableCfs.put("foo", Lists.newArrayList("a"));
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    filter = new TableCfWALEntryFilter(peer);
+    assertEquals(createEntry(a), filter.filter(userEntry));
+
+    // table foo:a,c
+    userEntry = createEntry(a, b, c, d);
+    tableCfs = new HashMap<String, List<String>>();
+    tableCfs.put("foo", Lists.newArrayList("a", "c"));
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    filter = new TableCfWALEntryFilter(peer);
+    assertEquals(createEntry(a,c), filter.filter(userEntry));
+  }
+
+  private HLog.Entry createEntry(byte[]... kvs) {
+    HLogKey key1 = new HLogKey(new byte[] {}, TableName.valueOf("foo"));
+    WALEdit edit1 = new WALEdit();
+
+    for (byte[] kv : kvs) {
+      edit1.add(new KeyValue(kv, kv, kv));
+    }
+    return new HLog.Entry(key1, edit1);
+  }
+
+
+  private void assertEquals(HLog.Entry e1, HLog.Entry e2) {
+    Assert.assertEquals(e1 == null, e2 == null);
+    if (e1 == null) {
+      return;
+    }
+
+    // do not compare HLogKeys
+
+    // compare kvs
+    Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null);
+    if (e1.getEdit() == null) {
+      return;
+    }
+    List<KeyValue> kvs1 = e1.getEdit().getKeyValues();
+    List<KeyValue> kvs2 = e2.getEdit().getKeyValues();
+    Assert.assertEquals(kvs1.size(), kvs2.size());
+    for (int i = 0; i < kvs1.size(); i++) {
+      KeyValue.COMPARATOR.compare(kvs1.get(i), kvs2.get(i));
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 296f953..9175192 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
 import org.junit.Before;
@@ -42,13 +43,15 @@ public class TestReplicationSinkManager {
   private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
 
   private ReplicationPeers replicationPeers;
+  private HBaseReplicationEndpoint replicationEndpoint;
   private ReplicationSinkManager sinkManager;
 
   @Before
   public void setUp() {
     replicationPeers = mock(ReplicationPeers.class);
+    replicationEndpoint = mock(HBaseReplicationEndpoint.class);
     sinkManager = new ReplicationSinkManager(mock(HConnection.class),
-                      PEER_CLUSTER_ID, replicationPeers, new Configuration());
+                      PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
   }
 
   @Test
@@ -58,7 +61,7 @@ public class TestReplicationSinkManager {
       serverNames.add(mock(ServerName.class));
     }
 
-    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+    when(replicationEndpoint.getRegionServers())
           .thenReturn(serverNames);
 
     sinkManager.chooseSinks();
@@ -72,7 +75,7 @@ public class TestReplicationSinkManager {
     List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
       mock(ServerName.class));
 
-    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+    when(replicationEndpoint.getRegionServers())
           .thenReturn(serverNames);
 
     sinkManager.chooseSinks();
@@ -84,8 +87,8 @@ public class TestReplicationSinkManager {
   public void testReportBadSink() {
     ServerName serverNameA = mock(ServerName.class);
     ServerName serverNameB = mock(ServerName.class);
-    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(
-      Lists.newArrayList(serverNameA, serverNameB));
+    when(replicationEndpoint.getRegionServers())
+      .thenReturn(Lists.newArrayList(serverNameA, serverNameB));
 
     sinkManager.chooseSinks();
     // Sanity check
@@ -110,7 +113,7 @@ public class TestReplicationSinkManager {
     for (int i = 0; i < 20; i++) {
       serverNames.add(mock(ServerName.class));
     }
-    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+    when(replicationEndpoint.getRegionServers())
           .thenReturn(serverNames);
 
 
@@ -137,7 +140,7 @@ public class TestReplicationSinkManager {
     for (int i = 0; i < 20; i++) {
       serverNames.add(mock(ServerName.class));
     }
-    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+    when(replicationEndpoint.getRegionServers())
           .thenReturn(serverNames);
 
 


Mime
View raw message