Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D6F261064D for ; Sat, 22 Nov 2014 17:31:59 +0000 (UTC) Received: (qmail 3853 invoked by uid 500); 22 Nov 2014 17:31:59 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 3794 invoked by uid 500); 22 Nov 2014 17:31:59 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 3777 invoked by uid 99); 22 Nov 2014 17:31:59 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Nov 2014 17:31:59 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 18695A0BDAF; Sat, 22 Nov 2014 17:31:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: apurtell@apache.org To: commits@hbase.apache.org Date: Sat, 22 Nov 2014 17:31:59 -0000 Message-Id: <32ae7e73d14a4668b472adda5fdb44a6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] hbase git commit: HBASE-11992 Backport HBASE-11367 (Pluggable replication endpoint) to 0.98 (Ramkrishna S. Vasudevan) Repository: hbase Updated Branches: refs/heads/0.98 15902f83d -> 25be14686 http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java new file mode 100644 index 0000000..e4ec0bc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.util.List; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.apache.hadoop.ipc.RemoteException; + +/** + * A {@link ReplicationEndpoint} implementation for replicating to another HBase cluster. + * For the slave cluster it selects a random number of peers + * using a replication ratio. For example, if replication ration = 0.1 + * and slave cluster has 100 region servers, 10 will be selected. + *

+ * A stream is considered down when we cannot contact a region server on the + * peer cluster for more than 55 seconds by default. + */ +@InterfaceAudience.Private +public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint { + + private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class); + private HConnection conn; + + private Configuration conf; + + // How long should we sleep for each retry + private long sleepForRetries; + + // Maximum number of retries before taking bold actions + private int maxRetriesMultiplier; + // Socket timeouts require even bolder actions since we don't want to DDOS + private int socketTimeoutMultiplier; + //Metrics for this source + private MetricsSource metrics; + // Handles connecting to peer region servers + private ReplicationSinkManager replicationSinkMgr; + private boolean peersSelected = false; + + @Override + public void init(Context context) throws IOException { + super.init(context); + this.conf = HBaseConfiguration.create(ctx.getConfiguration()); + decorateConf(); + this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); + this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", + maxRetriesMultiplier * maxRetriesMultiplier); + // TODO: This connection is replication specific or we should make it particular to + // replication and make replication specific settings such as compression or codec to use + // passing Cells. + this.conn = HConnectionManager.createConnection(this.conf); + this.sleepForRetries = + this.conf.getLong("replication.source.sleepforretries", 1000); + this.metrics = context.getMetrics(); + // ReplicationQueueInfo parses the peerId out of the znode for us + this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf); + } + + private void decorateConf() { + String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); + if (StringUtils.isNotEmpty(replicationCodec)) { + this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); + } + } + + private void connectToPeers() { + getRegionServers(); + + int sleepMultiplier = 1; + + // Connect to peer cluster first, unless we have to stop + while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) { + replicationSinkMgr.chooseSinks(); + if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) { + if (sleepForRetries("Waiting for peers", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + } + + /** + * Do the sleeping logic + * @param msg Why we sleep + * @param sleepMultiplier by how many times the default sleeping time is augmented + * @return True if sleepMultiplier is < maxRetriesMultiplier + */ + protected boolean sleepForRetries(String msg, int sleepMultiplier) { + try { + if (LOG.isTraceEnabled()) { + LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); + } + Thread.sleep(this.sleepForRetries * sleepMultiplier); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping between retries"); + } + return sleepMultiplier < maxRetriesMultiplier; + } + + /** + * Do the shipping logic + */ + @Override + public boolean replicate(ReplicateContext replicateContext) { + List entries = replicateContext.getEntries(); + int sleepMultiplier = 1; + while (this.isRunning()) { + if (!peersSelected) { + connectToPeers(); + peersSelected = true; + } + + if (!isPeerEnabled()) { + if (sleepForRetries("Replication is disabled", sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } + SinkPeer sinkPeer = null; + try { + sinkPeer = replicationSinkMgr.getReplicationSink(); + BlockingInterface rrs = sinkPeer.getRegionServer(); + if (LOG.isTraceEnabled()) { + LOG.trace("Replicating " + entries.size() + + " entries of total size " + replicateContext.getSize()); + } + ReplicationProtbufUtil.replicateWALEntry(rrs, + entries.toArray(new HLog.Entry[entries.size()])); + + // update metrics + this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); + return true; + + } catch (IOException ioe) { + // Didn't ship anything, but must still age the last time we did + this.metrics.refreshAgeOfLastShippedOp(); + if (ioe instanceof RemoteException) { + ioe = ((RemoteException) ioe).unwrapRemoteException(); + LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); + if (ioe instanceof TableNotFoundException) { + if (sleepForRetries("A table is missing in the peer cluster. " + + "Replication cannot proceed without losing data.", sleepMultiplier)) { + sleepMultiplier++; + } + } + } else { + if (ioe instanceof SocketTimeoutException) { + // This exception means we waited for more than 60s and nothing + // happened, the cluster is alive and calling it right away + // even for a test just makes things worse. + sleepForRetries("Encountered a SocketTimeoutException. Since the " + + "call to the remote cluster timed out, which is usually " + + "caused by a machine failure or a massive slowdown", + this.socketTimeoutMultiplier); + } else if (ioe instanceof ConnectException) { + LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); + replicationSinkMgr.chooseSinks(); + } else { + LOG.warn("Can't replicate because of a local or network error: ", ioe); + } + } + + if (sinkPeer != null) { + replicationSinkMgr.reportBadSink(sinkPeer); + } + if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + return false; // in case we exited before replicating + } + + protected boolean isPeerEnabled() { + return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED; + } + + @Override + protected void doStop() { + disconnect(); //don't call super.doStop() + if (this.conn != null) { + try { + this.conn.close(); + this.conn = null; + } catch (IOException e) { + LOG.warn("Failed to close the connection"); + } + } + notifyStopped(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index c6f0329..6a1ca36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -22,13 +22,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * This class is for maintaining the various replication statistics for a source and publishing them * through the metrics interfaces. */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public class MetricsSource { public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; @@ -152,7 +153,7 @@ public class MetricsSource { rms.incCounters(shippedKBsKey, sizeInKB); rms.incCounters(SOURCE_SHIPPED_KBS, sizeInKB); } - + /** increase the byte number read by source from log file */ public void incrLogReadInBytes(long readInBytes) { rms.incCounters(logReadInBytesKey, readInBytes); http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java index 2c36179..28fb50f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java @@ -29,8 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.replication.ReplicationPeers; - +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -61,7 +60,7 @@ public class ReplicationSinkManager { private final String peerClusterId; - private final ReplicationPeers replicationPeers; + private final HBaseReplicationEndpoint endpoint; // Count of "bad replication sink" reports per peer sink private final Map badReportCounts; @@ -85,15 +84,15 @@ public class ReplicationSinkManager { * Instantiate for a single replication peer cluster. * @param conn connection to the peer cluster * @param peerClusterId identifier of the peer cluster - * @param replicationPeers manages peer clusters being replicated to + * @param endpoint replication endpoint for inter cluster replication * @param conf HBase configuration, used for determining replication source ratio and bad peer * threshold */ public ReplicationSinkManager(HConnection conn, String peerClusterId, - ReplicationPeers replicationPeers, Configuration conf) { + HBaseReplicationEndpoint endpoint, Configuration conf) { this.conn = conn; this.peerClusterId = peerClusterId; - this.replicationPeers = replicationPeers; + this.endpoint = endpoint; this.badReportCounts = Maps.newHashMap(); this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold", @@ -107,8 +106,7 @@ public class ReplicationSinkManager { * @return a replication sink to replicate to */ public SinkPeer getReplicationSink() throws IOException { - if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId) > this.lastUpdateToPeers || - sinks.isEmpty()) { + if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) { LOG.info("Current list of sinks is out of date, updating"); chooseSinks(); } @@ -143,8 +141,7 @@ public class ReplicationSinkManager { } void chooseSinks() { - List slaveAddresses = - replicationPeers.getRegionServersOfConnectedPeer(peerClusterId); + List slaveAddresses = endpoint.getRegionServers(); Collections.shuffle(slaveAddresses, random); int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); sinks = slaveAddresses.subList(0, numSinks); http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index bdec2a2..1e7f5c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -21,13 +21,9 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; import java.util.UUID; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -40,27 +36,24 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; +import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.ipc.RemoteException; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Service; /** * Class that handles the source of a replication stream. @@ -81,9 +74,9 @@ public class ReplicationSource extends Thread public static final Log LOG = LogFactory.getLog(ReplicationSource.class); // Queue of logs to process private PriorityBlockingQueue queue; - private HConnection conn; private ReplicationQueues replicationQueues; private ReplicationPeers replicationPeers; + private Configuration conf; private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to @@ -117,8 +110,6 @@ public class ReplicationSource extends Thread private String peerClusterZnode; // Maximum number of retries before taking bold actions private int maxRetriesMultiplier; - // Socket timeouts require even bolder actions since we don't want to DDOS - private int socketTimeoutMultiplier; // Current number of operations (Put/Delete) that we need to replicate private int currentNbOperations = 0; // Current size of data we need to replicate @@ -129,10 +120,14 @@ public class ReplicationSource extends Thread private MetricsSource metrics; // Handle on the log reader helper private ReplicationHLogReaderManager repLogReader; - // Handles connecting to peer region servers - private ReplicationSinkManager replicationSinkMgr; //WARN threshold for the number of queued logs, defaults to 2 private int logQueueWarnThreshold; + // ReplicationEndpoint which will handle the actual replication + private ReplicationEndpoint replicationEndpoint; + // A filter (or a chain of filters) for the WAL entries. + private WALEntryFilter walEntryFilter; + // Context for ReplicationEndpoint#replicate() + private ReplicationEndpoint.ReplicateContext replicateContext; // throttler private ReplicationThrottler throttler; @@ -144,30 +139,30 @@ public class ReplicationSource extends Thread * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver * @param peerClusterZnode the name of our znode + * @param clusterId unique UUID for the cluster + * @param replicationEndpoint the replication endpoint implementation + * @param metrics metrics for replication source * @throws IOException */ + @Override public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, - final String peerClusterZnode, final UUID clusterId) throws IOException { + final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, + final MetricsSource metrics) + throws IOException { this.stopper = stopper; - this.conf = HBaseConfiguration.create(conf); + this.conf = conf; decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); - this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", - maxRetriesMultiplier * maxRetriesMultiplier); this.queue = new PriorityBlockingQueue( this.conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); - // TODO: This connection is replication specific or we should make it particular to - // replication and make replication specific settings such as compression or codec to use - // passing Cells. - this.conn = HConnectionManager.getConnection(this.conf); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; @@ -176,7 +171,7 @@ public class ReplicationSource extends Thread this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; - this.metrics = new MetricsSource(peerClusterZnode); + this.metrics = metrics; this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); this.clusterId = clusterId; @@ -184,8 +179,10 @@ public class ReplicationSource extends Thread this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); - this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); + this.replicationEndpoint = replicationEndpoint; + + this.replicateContext = new ReplicationEndpoint.ReplicateContext(); } private void decorateConf() { @@ -208,30 +205,48 @@ public class ReplicationSource extends Thread } private void uninitialize() { - if (this.conn != null) { - try { - this.conn.close(); - } catch (IOException e) { - LOG.debug("Attempt to close connection failed", e); - } - } LOG.debug("Source exiting " + this.peerId); metrics.clear(); + if (replicationEndpoint.state() == Service.State.STARTING + || replicationEndpoint.state() == Service.State.RUNNING) { + replicationEndpoint.stopAndWait(); + } } @Override public void run() { - connectToPeers(); // We were stopped while looping to connect to sinks, just abort if (!this.isActive()) { uninitialize(); return; } + try { + // start the endpoint, connect to the cluster + Service.State state = replicationEndpoint.start().get(); + if (state != Service.State.RUNNING) { + LOG.warn("ReplicationEndpoint was not started. Exiting"); + uninitialize(); + return; + } + } catch (Exception ex) { + LOG.warn("Error starting ReplicationEndpoint, exiting", ex); + throw new RuntimeException(ex); + } + + // get the WALEntryFilter from ReplicationEndpoint and add it to default filters + ArrayList filters = Lists.newArrayList( + (WALEntryFilter)new SystemTableWALEntryFilter()); + WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); + if (filterFromEndpoint != null) { + filters.add(filterFromEndpoint); + } + this.walEntryFilter = new ChainWALEntryFilter(filters); + int sleepMultiplier = 1; // delay this until we are in an asynchronous thread while (this.isActive() && this.peerClusterId == null) { - this.peerClusterId = replicationPeers.getPeerUUID(this.peerId); + this.peerClusterId = replicationEndpoint.getPeerUUID(); if (this.isActive() && this.peerClusterId == null) { if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; @@ -249,9 +264,10 @@ public class ReplicationSource extends Thread // In rare case, zookeeper setting may be messed up. That leads to the incorrect // peerClusterId value, which is the same as the source clusterId - if (clusterId.equals(peerClusterId)) { + if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " - + peerClusterId); + + peerClusterId + " which is not allowed by ReplicationEndpoint:" + + replicationEndpoint.getClass().getName(), null, false); } LOG.info("Replicating "+clusterId + " -> " + peerClusterId); @@ -399,8 +415,8 @@ public class ReplicationSource extends Thread * entries * @throws IOException */ - protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List entries) - throws IOException{ + protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, + List entries) throws IOException{ long seenEntries = 0; if (LOG.isTraceEnabled()) { LOG.trace("Seeking in " + this.currentPath + " at position " @@ -411,18 +427,22 @@ public class ReplicationSource extends Thread HLog.Entry entry = this.repLogReader.readNextAndSetPosition(); while (entry != null) { - WALEdit edit = entry.getEdit(); this.metrics.incrLogEditsRead(); seenEntries++; - // Remove all KVs that should not be replicated - HLogKey logKey = entry.getKey(); + // don't replicate if the log entries have already been consumed by the cluster - if (!logKey.getClusterIds().contains(peerClusterId)) { - removeNonReplicableEdits(entry); - // Don't replicate catalog entries, if the WALEdit wasn't - // containing anything to replicate and if we're currently not set to replicate - if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) && - edit.size() != 0) { + if (replicationEndpoint.canReplicateToSameCluster() + || !entry.getKey().getClusterIds().contains(peerClusterId)) { + // Remove all KVs that should not be replicated + entry = walEntryFilter.filter(entry); + WALEdit edit = null; + HLogKey logKey = null; + if (entry != null) { + edit = entry.getEdit(); + logKey = entry.getKey(); + } + + if (edit != null && edit.size() != 0) { //Mark that the current cluster has the change logKey.addClusterId(clusterId); currentNbOperations += countDistinctRowKeys(edit); @@ -453,20 +473,6 @@ public class ReplicationSource extends Thread return seenEntries == 0 && processEndOfFile(); } - private void connectToPeers() { - int sleepMultiplier = 1; - - // Connect to peer cluster first, unless we have to stop - while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { - replicationSinkMgr.chooseSinks(); - if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { - if (sleepForRetries("Waiting for peers", sleepMultiplier)) { - sleepMultiplier++; - } - } - } - } - /** * Poll for the next path * @return true if a path was obtained, false if not @@ -596,8 +602,8 @@ public class ReplicationSource extends Thread /* * Checks whether the current log file is empty, and it is not a recovered queue. This is to * handle scenario when in an idle cluster, there is no entry in the current log and we keep on - * trying to read the log file and get EOFEception. In case of a recovered queue the last log file - * may be empty, and we don't want to retry that. + * trying to read the log file and get EOFException. In case of a recovered queue the last log + * file may be empty, and we don't want to retry that. */ private boolean isCurrentLogEmpty() { return (this.repLogReader.getPosition() == 0 && @@ -624,47 +630,6 @@ public class ReplicationSource extends Thread } /** - * We only want KVs that are scoped other than local - * @param entry The entry to check for replication - */ - protected void removeNonReplicableEdits(HLog.Entry entry) { - String tabName = entry.getKey().getTablename().getNameAsString(); - ArrayList kvs = entry.getEdit().getKeyValues(); - Map> tableCFs = null; - try { - tableCFs = this.replicationPeers.getTableCFs(peerId); - } catch (IllegalArgumentException e) { - LOG.error("should not happen: can't get tableCFs for peer " + peerId + - ", degenerate as if it's not configured by keeping tableCFs==null"); - } - int size = kvs.size(); - - // clear kvs(prevent replicating) if logKey's table isn't in this peer's - // replicable table list (empty tableCFs means all table are replicable) - if (tableCFs != null && !tableCFs.containsKey(tabName)) { - kvs.clear(); - } else { - NavigableMap scopes = entry.getKey().getScopes(); - List cfs = (tableCFs == null) ? null : tableCFs.get(tabName); - for (int i = size - 1; i >= 0; i--) { - KeyValue kv = kvs.get(i); - // The scope will be null or empty if - // there's nothing to replicate in that WALEdit - // ignore(remove) kv if its cf isn't in the replicable cf list - // (empty cfs means all cfs of this table are replicable) - if (scopes == null || !scopes.containsKey(kv.getFamily()) || - (cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) { - kvs.remove(i); - } - } - } - - if (kvs.size() < size/2) { - kvs.trimToSize(); - } - } - - /** * Count the number of different row keys in the given edit because of * mini-batching. We assume that there's at least one KV in the WALEdit. * @param edit edit to count row keys from @@ -694,13 +659,6 @@ public class ReplicationSource extends Thread return; } while (this.isActive()) { - if (!isPeerEnabled()) { - if (sleepForRetries("Replication is disabled", sleepMultiplier)) { - sleepMultiplier++; - } - continue; - } - SinkPeer sinkPeer = null; try { if (this.throttler.isEnabled()) { long sleepTicks = this.throttler.getNextSleepInterval(currentSize); @@ -721,14 +679,15 @@ public class ReplicationSource extends Thread this.throttler.resetStartTick(); } } - sinkPeer = replicationSinkMgr.getReplicationSink(); - BlockingInterface rrs = sinkPeer.getRegionServer(); - if (LOG.isTraceEnabled()) { - LOG.trace("Replicating " + entries.size() + - " entries of total size " + currentSize); + replicateContext.setEntries(entries).setSize(currentSize); + + // send the edits to the endpoint. Will block until the edits are shipped and acknowledged + boolean replicated = replicationEndpoint.replicate(replicateContext); + + if (!replicated) { + continue; } - ReplicationProtbufUtil.replicateWALEntry(rrs, - entries.toArray(new HLog.Entry[entries.size()])); + if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.repLogReader.getPosition(), @@ -747,50 +706,9 @@ public class ReplicationSource extends Thread + this.totalReplicatedOperations + " operations"); } break; - - } catch (IOException ioe) { - // Didn't ship anything, but must still age the last time we did - this.metrics.refreshAgeOfLastShippedOp(); - if (ioe instanceof RemoteException) { - ioe = ((RemoteException) ioe).unwrapRemoteException(); - LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); - if (ioe instanceof TableNotFoundException) { - if (sleepForRetries("A table is missing in the peer cluster. " - + "Replication cannot proceed without losing data.", sleepMultiplier)) { - sleepMultiplier++; - } - // current thread might be interrupted to terminate - // directly go back to while() for confirm this - if (isInterrupted()) { - continue; - } - } - } else { - if (ioe instanceof SocketTimeoutException) { - // This exception means we waited for more than 60s and nothing - // happened, the cluster is alive and calling it right away - // even for a test just makes things worse. - sleepForRetries("Encountered a SocketTimeoutException. Since the " + - "call to the remote cluster timed out, which is usually " + - "caused by a machine failure or a massive slowdown", - this.socketTimeoutMultiplier); - // current thread might be interrupted to terminate - // directly go back to while() for confirm this - if (isInterrupted()) { - continue; - } - } else if (ioe instanceof ConnectException) { - LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); - replicationSinkMgr.chooseSinks(); - } else { - LOG.warn("Can't replicate because of a local or network error: ", ioe); - } - } - - if (sinkPeer != null) { - replicationSinkMgr.reportBadSink(sinkPeer); - } - if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { + } catch (Exception ex) { + LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex); + if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { sleepMultiplier++; } } @@ -803,7 +721,7 @@ public class ReplicationSource extends Thread * @return true if the peer is enabled, otherwise false */ protected boolean isPeerEnabled() { - return this.replicationPeers.getStatusOfConnectedPeer(this.peerId); + return this.replicationPeers.getStatusOfPeer(this.peerId); } /** @@ -837,10 +755,12 @@ public class ReplicationSource extends Thread return false; } + @Override public void startup() { String n = Thread.currentThread().getName(); Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { + @Override public void uncaughtException(final Thread t, final Throwable e) { LOG.error("Unexpected exception in ReplicationSource," + " currentPath=" + currentPath, e); @@ -851,11 +771,17 @@ public class ReplicationSource extends Thread this.peerClusterZnode, handler); } + @Override public void terminate(String reason) { terminate(reason, null); } + @Override public void terminate(String reason, Exception cause) { + terminate(reason, cause, true); + } + + public void terminate(String reason, Exception cause, boolean join) { if (cause == null) { LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason); @@ -866,17 +792,33 @@ public class ReplicationSource extends Thread } this.running = false; this.interrupt(); - Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier); + ListenableFuture future = null; + if (this.replicationEndpoint != null) { + future = this.replicationEndpoint.stop(); + } + if (join) { + Threads.shutdown(this, this.sleepForRetries); + if (future != null) { + try { + future.get(); + } catch (Exception e) { + LOG.warn("Got exception:" + e); + } + } + } } + @Override public String getPeerClusterZnode() { return this.peerClusterZnode; } + @Override public String getPeerClusterId() { return this.peerId; } + @Override public Path getCurrentPath() { return this.currentPath; } http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 17a6e6e..1e9c714 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -50,7 +51,8 @@ public interface ReplicationSourceInterface { public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, - final String peerClusterZnode, final UUID clusterId) throws IOException; + final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, + final MetricsSource metrics) throws IOException; /** * Add a log to the list of logs to replicate http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index e3b8dc9..f247bb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -44,9 +44,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.zookeeper.KeeperException; @@ -119,7 +123,7 @@ public class ReplicationSourceManager implements ReplicationListener { final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir, final Path oldLogDir, final UUID clusterId) { //CopyOnWriteArrayList is thread-safe. - //Generally, reading is more than modifying. + //Generally, reading is more than modifying. this.sources = new CopyOnWriteArrayList(); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; @@ -209,7 +213,7 @@ public class ReplicationSourceManager implements ReplicationListener { * old region server hlog queues */ protected void init() throws IOException, ReplicationException { - for (String id : this.replicationPeers.getConnectedPeers()) { + for (String id : this.replicationPeers.getPeerIds()) { addSource(id); } List currentReplicators = this.replicationQueues.getListOfReplicators(); @@ -236,9 +240,12 @@ public class ReplicationSourceManager implements ReplicationListener { */ protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { + ReplicationPeerConfig peerConfig + = replicationPeers.getReplicationPeerConfig(id); + ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, - this.replicationPeers, stopper, id, this.clusterId); + this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer); synchronized (this.hlogsById) { this.sources.add(src); this.hlogsById.put(id, new TreeSet()); @@ -269,7 +276,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void deleteSource(String peerId, boolean closeConnection) { this.replicationQueues.removeQueue(peerId); if (closeConnection) { - this.replicationPeers.disconnectFromPeer(peerId); + this.replicationPeers.peerRemoved(peerId); } } @@ -362,7 +369,9 @@ public class ReplicationSourceManager implements ReplicationListener { protected ReplicationSourceInterface getReplicationSource(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, - final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException { + final Stoppable stopper, final String peerId, final UUID clusterId, + final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer) + throws IOException { ReplicationSourceInterface src; try { @SuppressWarnings("rawtypes") @@ -373,9 +382,32 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.warn("Passed replication source implementation throws errors, " + "defaulting to ReplicationSource", e); src = new ReplicationSource(); + } + ReplicationEndpoint replicationEndpoint = null; + try { + String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); + if (replicationEndpointImpl == null) { + // Default to HBase inter-cluster replication endpoint + replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); + } + @SuppressWarnings("rawtypes") + Class c = Class.forName(replicationEndpointImpl); + replicationEndpoint = (ReplicationEndpoint) c.newInstance(); + } catch (Exception e) { + LOG.warn("Passed replication endpoint implementation throws errors", e); + throw new IOException(e); } - src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId); + + MetricsSource metrics = new MetricsSource(peerId); + // init replication source + src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, + clusterId, replicationEndpoint, metrics); + + // init replication endpoint + replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), + fs, peerConfig, peerId, clusterId, replicationPeer, metrics)); + return src; } @@ -464,7 +496,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void peerListChanged(List peerIds) { for (String id : peerIds) { try { - boolean added = this.replicationPeers.connectToPeer(id); + boolean added = this.replicationPeers.peerAdded(id); if (added) { addSource(id); } @@ -530,10 +562,26 @@ public class ReplicationSourceManager implements ReplicationListener { for (Map.Entry> entry : newQueues.entrySet()) { String peerId = entry.getKey(); try { + // there is not an actual peer defined corresponding to peerId for the failover. + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + String actualPeerId = replicationQueueInfo.getPeerId(); + ReplicationPeer peer = replicationPeers.getPeer(actualPeerId); + ReplicationPeerConfig peerConfig = null; + try { + peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId); + } catch (ReplicationException ex) { + LOG.warn("Received exception while getting replication peer config, skipping replay" + + ex); + } + if (peer == null || peerConfig == null) { + LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode); + continue; + } + ReplicationSourceInterface src = getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, - stopper, peerId, this.clusterId); - if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) { + stopper, peerId, this.clusterId, peerConfig, peer); + if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) { src.terminate("Recovered queue doesn't belong to any current peer"); break; } @@ -586,7 +634,7 @@ public class ReplicationSourceManager implements ReplicationListener { stats.append(source.getStats() + "\n"); } for (ReplicationSourceInterface oldSource : oldsources) { - stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": "); + stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": "); stats.append(oldSource.getStats()+ "\n"); } return stats.toString(); http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index fa744ff..77bc64e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.client.replication; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -25,6 +29,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; + import static org.junit.Assert.fail; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -117,5 +123,36 @@ public class TestReplicationAdmin { admin.removePeer(ID_ONE); } + @Test + public void testGetTableCfsStr() { + // opposite of TestPerTableCFReplication#testParseTableCFsFromConfig() + + Map> tabCFsMap = null; + + // 1. null or empty string, result should be null + assertEquals(null, ReplicationAdmin.getTableCfsStr(tabCFsMap)); + + + // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" + tabCFsMap = new TreeMap>(); + tabCFsMap.put(TableName.valueOf("tab1"), null); // its table name is "tab1" + assertEquals("tab1", ReplicationAdmin.getTableCfsStr(tabCFsMap)); + + tabCFsMap = new TreeMap>(); + tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1")); + assertEquals("tab1:cf1", ReplicationAdmin.getTableCfsStr(tabCFsMap)); + + tabCFsMap = new TreeMap>(); + tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1", "cf3")); + assertEquals("tab1:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap)); + + // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3" + tabCFsMap = new TreeMap>(); + tabCFsMap.put(TableName.valueOf("tab1"), null); + tabCFsMap.put(TableName.valueOf("tab2"), Lists.newArrayList("cf1")); + tabCFsMap.put(TableName.valueOf("tab3"), Lists.newArrayList("cf1", "cf3")); + assertEquals("tab1;tab2:cf1;tab3:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap)); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index a2d4a8d..f463f76 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; @@ -40,7 +41,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, - UUID clusterId) throws IOException { + UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) + throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java index ee102fc..ff77a94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java @@ -175,30 +175,30 @@ public class TestPerTableCFReplication { Map> tabCFsMap = null; // 1. null or empty string, result should be null - tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(null); + tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(null); assertEquals(null, tabCFsMap); - tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(""); + tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(""); assertEquals(null, tabCFsMap); - tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(" "); + tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(" "); assertEquals(null, tabCFsMap); // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" - tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1"); + tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1"); assertEquals(1, tabCFsMap.size()); // only one table assertTrue(tabCFsMap.containsKey("tab1")); // its table name is "tab1" assertFalse(tabCFsMap.containsKey("tab2")); // not other table assertEquals(null, tabCFsMap.get("tab1")); // null cf-list, - tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab2:cf1"); + tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab2:cf1"); assertEquals(1, tabCFsMap.size()); // only one table assertTrue(tabCFsMap.containsKey("tab2")); // its table name is "tab2" assertFalse(tabCFsMap.containsKey("tab1")); // not other table assertEquals(1, tabCFsMap.get("tab2").size()); // cf-list contains only 1 cf assertEquals("cf1", tabCFsMap.get("tab2").get(0));// the only cf is "cf1" - tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab3 : cf1 , cf3"); + tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab3 : cf1 , cf3"); assertEquals(1, tabCFsMap.size()); // only one table assertTrue(tabCFsMap.containsKey("tab3")); // its table name is "tab2" assertFalse(tabCFsMap.containsKey("tab1")); // not other table @@ -207,7 +207,7 @@ public class TestPerTableCFReplication { assertTrue(tabCFsMap.get("tab3").contains("cf3"));// contains "cf3" // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3" - tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3"); + tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3"); // 3.1 contains 3 tables : "tab1", "tab2" and "tab3" assertEquals(3, tabCFsMap.size()); assertTrue(tabCFsMap.containsKey("tab1")); @@ -225,7 +225,8 @@ public class TestPerTableCFReplication { // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated // still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3" - tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;"); + tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig( + "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;"); // 4.1 contains 3 tables : "tab1", "tab2" and "tab3" assertEquals(3, tabCFsMap.size()); assertTrue(tabCFsMap.containsKey("tab1")); @@ -243,7 +244,8 @@ public class TestPerTableCFReplication { // 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3" // "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally - tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"); + tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig( + "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"); // 5.1 no "tab1" and "tab2", only "tab3" assertEquals(1, tabCFsMap.size()); // only one table assertFalse(tabCFsMap.containsKey("tab1")); http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java new file mode 100644 index 0000000..38aaa7a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests ReplicationSource and ReplicationEndpoint interactions + */ +@Category(MediumTests.class) +public class TestReplicationEndpoint extends TestReplicationBase { + + static int numRegionServers; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestReplicationBase.setUpBeforeClass(); + utility2.shutdownMiniCluster(); // we don't need the second cluster + admin.removePeer("2"); + numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TestReplicationBase.tearDownAfterClass(); + // check stop is called + Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0); + } + + @Before + public void setup() throws FailedLogCloseException, IOException { + ReplicationEndpointForTest.contructedCount.set(0); + ReplicationEndpointForTest.startedCount.set(0); + ReplicationEndpointForTest.replicateCount.set(0); + ReplicationEndpointForTest.lastEntries = null; + for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) { + utility1.getHBaseAdmin().rollHLogWriter(rs.getRegionServer().getServerName().toString()); + } + } + + @Test + public void testCustomReplicationEndpoint() throws Exception { + // test installing a custom replication endpoint other than the default one. + admin.addPeer("testCustomReplicationEndpoint", + new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); + + // check whether the class has been constructed and started + Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; + } + }); + + Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; + } + }); + + Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); + + // now replicate some data. + doPut(Bytes.toBytes("row42")); + + Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return ReplicationEndpointForTest.replicateCount.get() >= 1; + } + }); + + doAssert(Bytes.toBytes("row42")); + + admin.removePeer("testCustomReplicationEndpoint"); + } + + @Test + public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { + admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate", + new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); + // now replicate some data. + doPut(row); + + Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return ReplicationEndpointReturningFalse.replicated.get(); + } + }); + if (ReplicationEndpointReturningFalse.ex.get() != null) { + throw ReplicationEndpointReturningFalse.ex.get(); + } + + admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); + } + + @Test + public void testWALEntryFilterFromReplicationEndpoint() throws Exception { + admin.addPeer("testWALEntryFilterFromReplicationEndpoint", + new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null); + // now replicate some data. + doPut(Bytes.toBytes("row1")); + doPut(row); + doPut(Bytes.toBytes("row2")); + + Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return ReplicationEndpointForTest.replicateCount.get() >= 1; + } + }); + + Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); + admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); + } + + + private void doPut(byte[] row) throws IOException { + Put put = new Put(row); + put.add(famName, row, row); + htable1 = new HTable(conf1, tableName); + htable1.put(put); + htable1.close(); + } + + private static void doAssert(byte[] row) throws Exception { + if (ReplicationEndpointForTest.lastEntries == null) { + return; // first call + } + Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); + List kvs = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getKeyValues(); + Assert.assertEquals(1, kvs.size()); + Assert.assertTrue(Bytes.equals(kvs.get(0).getRowArray(), kvs.get(0).getRowOffset(), + kvs.get(0).getRowLength(), row, 0, row.length)); + } + + public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { + static UUID uuid = UUID.randomUUID(); + static AtomicInteger contructedCount = new AtomicInteger(); + static AtomicInteger startedCount = new AtomicInteger(); + static AtomicInteger stoppedCount = new AtomicInteger(); + static AtomicInteger replicateCount = new AtomicInteger(); + static volatile List lastEntries = null; + + public ReplicationEndpointForTest() { + contructedCount.incrementAndGet(); + } + + @Override + public UUID getPeerUUID() { + return uuid; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + replicateCount.incrementAndGet(); + lastEntries = replicateContext.entries; + return true; + } + + @Override + protected void doStart() { + startedCount.incrementAndGet(); + notifyStarted(); + } + + @Override + protected void doStop() { + stoppedCount.incrementAndGet(); + notifyStopped(); + } + } + + public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { + static AtomicReference ex = new AtomicReference(null); + static AtomicBoolean replicated = new AtomicBoolean(false); + @Override + public boolean replicate(ReplicateContext replicateContext) { + try { + // check row + doAssert(row); + } catch (Exception e) { + ex.set(e); + } + + super.replicate(replicateContext); + + replicated.set(replicateCount.get() > 10); // first 10 times, we return false + return replicated.get(); + } + } + + // return a WALEntry filter which only accepts "row", but not other rows + public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest { + static AtomicReference ex = new AtomicReference(null); + + @Override + public boolean replicate(ReplicateContext replicateContext) { + try { + super.replicate(replicateContext); + doAssert(row); + } catch (Exception e) { + ex.set(e); + } + return true; + } + + @Override + public WALEntryFilter getWALEntryfilter() { + return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { + @Override + public Entry filter(Entry entry) { + ArrayList kvs = entry.getEdit().getKeyValues(); + int size = kvs.size(); + for (int i = size-1; i >= 0; i--) { + KeyValue kv = kvs.get(i); + if (!Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), + row, 0, row.length)) { + kvs.remove(i); + } + } + return entry; + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index fd003ad..e560620 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -179,50 +179,48 @@ public abstract class TestReplicationStateBasic { } catch (IllegalArgumentException e) { } try { - rp.getStatusOfConnectedPeer("bogus"); + rp.getStatusOfPeer("bogus"); fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); } catch (IllegalArgumentException e) { } - assertFalse(rp.connectToPeer("bogus")); - rp.disconnectFromPeer("bogus"); - assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size()); - assertNull(rp.getPeerUUID("bogus")); + assertFalse(rp.peerAdded("bogus")); + rp.peerRemoved("bogus"); + assertNull(rp.getPeerConf("bogus")); - assertNumberOfPeers(0, 0); + assertNumberOfPeers(0); // Add some peers - rp.addPeer(ID_ONE, KEY_ONE); - assertNumberOfPeers(0, 1); - rp.addPeer(ID_TWO, KEY_TWO); - assertNumberOfPeers(0, 2); + rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); + assertNumberOfPeers(1); + rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null); + assertNumberOfPeers(2); // Test methods with a peer that is added but not connected try { - rp.getStatusOfConnectedPeer(ID_ONE); + rp.getStatusOfPeer(ID_ONE); fail("There are no connected peers, should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException e) { } - assertNull(rp.getPeerUUID(ID_ONE)); - assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE))); - rp.disconnectFromPeer(ID_ONE); - assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size()); - - // Connect to one peer - rp.connectToPeer(ID_ONE); - assertNumberOfPeers(1, 2); - assertTrue(rp.getStatusOfConnectedPeer(ID_ONE)); + assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond())); + rp.removePeer(ID_ONE); + rp.peerRemoved(ID_ONE); + assertNumberOfPeers(1); + + // Add one peer + rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); + rp.peerAdded(ID_ONE); + assertNumberOfPeers(2); + assertTrue(rp.getStatusOfPeer(ID_ONE)); rp.disablePeer(ID_ONE); assertConnectedPeerStatus(false, ID_ONE); rp.enablePeer(ID_ONE); assertConnectedPeerStatus(true, ID_ONE); - assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size()); - assertNotNull(rp.getPeerUUID(ID_ONE).toString()); // Disconnect peer - rp.disconnectFromPeer(ID_ONE); - assertNumberOfPeers(0, 2); + rp.peerRemoved(ID_ONE); + assertNumberOfPeers(2); try { - rp.getStatusOfConnectedPeer(ID_ONE); + rp.getStatusOfPeer(ID_ONE); fail("There are no connected peers, should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException e) { } @@ -234,7 +232,7 @@ public abstract class TestReplicationStateBasic { fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); } while (true) { - if (status == rp.getStatusOfConnectedPeer(peerId)) { + if (status == rp.getStatusOfPeer(peerId)) { return; } if (zkTimeoutCount < ZK_MAX_COUNT) { @@ -247,9 +245,9 @@ public abstract class TestReplicationStateBasic { } } - protected void assertNumberOfPeers(int connected, int total) { - assertEquals(total, rp.getAllPeerClusterKeys().size()); - assertEquals(connected, rp.getConnectedPeers().size()); + protected void assertNumberOfPeers(int total) { + assertEquals(total, rp.getAllPeerConfigs().size()); + assertEquals(total, rp.getAllPeerIds().size()); assertEquals(total, rp.getAllPeerIds().size()); } @@ -269,7 +267,7 @@ public abstract class TestReplicationStateBasic { rq3.addLog("qId" + i, "filename" + j); } //Add peers for the corresponding queues so they are not orphans - rp.addPeer("qId" + i, "bogus" + i); + rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java index 99a7593..48a6245 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hbase.replication; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -36,12 +40,9 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; -import org.junit.Test; - -import static org.junit.Assert.*; - import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Test; import org.junit.experimental.categories.Category; /** @@ -142,7 +143,7 @@ public class TestReplicationTrackerZKImpl { @Test(timeout = 30000) public void testPeerRemovedEvent() throws Exception { - rp.addPeer("5", utility.getClusterKey()); + rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null); rt.registerListener(new DummyReplicationListener()); rp.removePeer("5"); // wait for event @@ -155,7 +156,7 @@ public class TestReplicationTrackerZKImpl { @Test(timeout = 30000) public void testPeerListChangedEvent() throws Exception { // add a peer - rp.addPeer("5", utility.getClusterKey()); + rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null); zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true); rt.registerListener(new DummyReplicationListener()); rp.disablePeer("5"); @@ -177,16 +178,16 @@ public class TestReplicationTrackerZKImpl { public void testPeerNameControl() throws Exception { int exists = 0; int hyphen = 0; - rp.addPeer("6", utility.getClusterKey(), null); + rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null); try{ - rp.addPeer("6", utility.getClusterKey(), null); + rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null); }catch(IllegalArgumentException e){ exists++; } try{ - rp.addPeer("6-ec2", utility.getClusterKey(), null); + rp.addPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null); }catch(IllegalArgumentException e){ hyphen++; } http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java new file mode 100644 index 0000000..2bd813c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +@Category(SmallTests.class) +public class TestReplicationWALEntryFilters { + + static byte[] a = new byte[] {'a'}; + static byte[] b = new byte[] {'b'}; + static byte[] c = new byte[] {'c'}; + static byte[] d = new byte[] {'d'}; + + @Test + public void testSystemTableWALEntryFilter() { + SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter(); + + // meta + HLogKey key1 = new HLogKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), + HTableDescriptor.META_TABLEDESC.getTableName(), 0, 0, null); + HLog.Entry metaEntry = new Entry(key1, null); + + assertNull(filter.filter(metaEntry)); + + // ns table + HLogKey key2 = new HLogKey(new byte[] {}, HTableDescriptor.NAMESPACE_TABLEDESC.getTableName() + , 0, 0, null); + HLog.Entry nsEntry = new Entry(key2, null); + assertNull(filter.filter(nsEntry)); + + // user table + + HLogKey key3 = new HLogKey(new byte[] {}, TableName.valueOf("foo"), 0, 0, null); + HLog.Entry userEntry = new Entry(key3, null); + + assertEquals(userEntry, filter.filter(userEntry)); + } + + @Test + public void testScopeWALEntryFilter() { + ScopeWALEntryFilter filter = new ScopeWALEntryFilter(); + + HLog.Entry userEntry = createEntry(a, b); + HLog.Entry userEntryA = createEntry(a); + HLog.Entry userEntryB = createEntry(b); + HLog.Entry userEntryEmpty = createEntry(); + + // no scopes + assertEquals(null, filter.filter(userEntry)); + + // empty scopes + TreeMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + userEntry = createEntry(a, b); + userEntry.getKey().setScopes(scopes); + assertEquals(null, filter.filter(userEntry)); + + // different scope + scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL); + userEntry = createEntry(a, b); + userEntry.getKey().setScopes(scopes); + // all kvs should be filtered + assertEquals(userEntryEmpty, filter.filter(userEntry)); + + // local scope + scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); + userEntry = createEntry(a, b); + userEntry.getKey().setScopes(scopes); + assertEquals(userEntryEmpty, filter.filter(userEntry)); + scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL); + assertEquals(userEntryEmpty, filter.filter(userEntry)); + + // only scope a + scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL); + userEntry = createEntry(a, b); + userEntry.getKey().setScopes(scopes); + assertEquals(userEntryA, filter.filter(userEntry)); + scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL); + assertEquals(userEntryA, filter.filter(userEntry)); + + // only scope b + scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); + userEntry = createEntry(a, b); + userEntry.getKey().setScopes(scopes); + assertEquals(userEntryB, filter.filter(userEntry)); + scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); + assertEquals(userEntryB, filter.filter(userEntry)); + + // scope a and b + scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); + userEntry = createEntry(a, b); + userEntry.getKey().setScopes(scopes); + assertEquals(userEntryB, filter.filter(userEntry)); + scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); + assertEquals(userEntryB, filter.filter(userEntry)); + } + + WALEntryFilter nullFilter = new WALEntryFilter() { + @Override + public Entry filter(Entry entry) { + return null; + } + }; + + WALEntryFilter passFilter = new WALEntryFilter() { + @Override + public Entry filter(Entry entry) { + return entry; + } + }; + + @Test + public void testChainWALEntryFilter() { + HLog.Entry userEntry = createEntry(a, b, c); + + ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter); + assertEquals(createEntry(a,b,c), filter.filter(userEntry)); + + filter = new ChainWALEntryFilter(passFilter, passFilter); + assertEquals(createEntry(a,b,c), filter.filter(userEntry)); + + filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter); + assertEquals(createEntry(a,b,c), filter.filter(userEntry)); + + filter = new ChainWALEntryFilter(nullFilter); + assertEquals(null, filter.filter(userEntry)); + + filter = new ChainWALEntryFilter(nullFilter, passFilter); + assertEquals(null, filter.filter(userEntry)); + + filter = new ChainWALEntryFilter(passFilter, nullFilter); + assertEquals(null, filter.filter(userEntry)); + + filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter); + assertEquals(null, filter.filter(userEntry)); + + filter = new ChainWALEntryFilter(nullFilter, nullFilter); + assertEquals(null, filter.filter(userEntry)); + + // flatten + filter = + new ChainWALEntryFilter( + new ChainWALEntryFilter(passFilter, + new ChainWALEntryFilter(passFilter, passFilter), + new ChainWALEntryFilter(passFilter), + new ChainWALEntryFilter(passFilter)), + new ChainWALEntryFilter(passFilter)); + assertEquals(createEntry(a,b,c), filter.filter(userEntry)); + + + filter = + new ChainWALEntryFilter( + new ChainWALEntryFilter(passFilter, + new ChainWALEntryFilter(passFilter, + new ChainWALEntryFilter(nullFilter))), + new ChainWALEntryFilter(passFilter)); + assertEquals(null, filter.filter(userEntry)); + } + + @Test + public void testTableCfWALEntryFilter() { + ReplicationPeer peer = mock(ReplicationPeer.class); + + when(peer.getTableCFs()).thenReturn(null); + HLog.Entry userEntry = createEntry(a, b, c); + TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer); + assertEquals(createEntry(a,b,c), filter.filter(userEntry)); + + // empty map + userEntry = createEntry(a, b, c); + Map> tableCfs = new HashMap>(); + when(peer.getTableCFs()).thenReturn(tableCfs); + filter = new TableCfWALEntryFilter(peer); + assertEquals(null, filter.filter(userEntry)); + + // table bar + userEntry = createEntry(a, b, c); + tableCfs = new HashMap>(); + tableCfs.put("bar", null); + when(peer.getTableCFs()).thenReturn(tableCfs); + filter = new TableCfWALEntryFilter(peer); + assertEquals(null, filter.filter(userEntry)); + + // table foo:a + userEntry = createEntry(a, b, c); + tableCfs = new HashMap>(); + tableCfs.put("foo", Lists.newArrayList("a")); + when(peer.getTableCFs()).thenReturn(tableCfs); + filter = new TableCfWALEntryFilter(peer); + assertEquals(createEntry(a), filter.filter(userEntry)); + + // table foo:a,c + userEntry = createEntry(a, b, c, d); + tableCfs = new HashMap>(); + tableCfs.put("foo", Lists.newArrayList("a", "c")); + when(peer.getTableCFs()).thenReturn(tableCfs); + filter = new TableCfWALEntryFilter(peer); + assertEquals(createEntry(a,c), filter.filter(userEntry)); + } + + private HLog.Entry createEntry(byte[]... kvs) { + HLogKey key1 = new HLogKey(new byte[] {}, TableName.valueOf("foo"), 0, 0, null); + WALEdit edit1 = new WALEdit(); + + for (byte[] kv : kvs) { + edit1.add(new KeyValue(kv, kv, kv)); + } + return new HLog.Entry(key1, edit1); + } + + + private void assertEquals(HLog.Entry e1, HLog.Entry e2) { + Assert.assertEquals(e1 == null, e2 == null); + if (e1 == null) { + return; + } + + // do not compare HLogKeys + + // compare kvs + Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null); + if (e1.getEdit() == null) { + return; + } + List kvs1 = e1.getEdit().getKeyValues(); + List kvs2 = e2.getEdit().getKeyValues(); + Assert.assertEquals(kvs1.size(), kvs2.size()); + for (int i = 0; i < kvs1.size(); i++) { + KeyValue.COMPARATOR.compare(kvs1.get(i), kvs2.get(i)); + } + } + + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java index 296f953..9175192 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.junit.Before; @@ -42,13 +43,15 @@ public class TestReplicationSinkManager { private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID"; private ReplicationPeers replicationPeers; + private HBaseReplicationEndpoint replicationEndpoint; private ReplicationSinkManager sinkManager; @Before public void setUp() { replicationPeers = mock(ReplicationPeers.class); + replicationEndpoint = mock(HBaseReplicationEndpoint.class); sinkManager = new ReplicationSinkManager(mock(HConnection.class), - PEER_CLUSTER_ID, replicationPeers, new Configuration()); + PEER_CLUSTER_ID, replicationEndpoint, new Configuration()); } @Test @@ -58,7 +61,7 @@ public class TestReplicationSinkManager { serverNames.add(mock(ServerName.class)); } - when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)) + when(replicationEndpoint.getRegionServers()) .thenReturn(serverNames); sinkManager.chooseSinks(); @@ -72,7 +75,7 @@ public class TestReplicationSinkManager { List serverNames = Lists.newArrayList(mock(ServerName.class), mock(ServerName.class)); - when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)) + when(replicationEndpoint.getRegionServers()) .thenReturn(serverNames); sinkManager.chooseSinks(); @@ -84,8 +87,8 @@ public class TestReplicationSinkManager { public void testReportBadSink() { ServerName serverNameA = mock(ServerName.class); ServerName serverNameB = mock(ServerName.class); - when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn( - Lists.newArrayList(serverNameA, serverNameB)); + when(replicationEndpoint.getRegionServers()) + .thenReturn(Lists.newArrayList(serverNameA, serverNameB)); sinkManager.chooseSinks(); // Sanity check @@ -110,7 +113,7 @@ public class TestReplicationSinkManager { for (int i = 0; i < 20; i++) { serverNames.add(mock(ServerName.class)); } - when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)) + when(replicationEndpoint.getRegionServers()) .thenReturn(serverNames); @@ -137,7 +140,7 @@ public class TestReplicationSinkManager { for (int i = 0; i < 20; i++) { serverNames.add(mock(ServerName.class)); } - when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)) + when(replicationEndpoint.getRegionServers()) .thenReturn(serverNames);