Return-Path: Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: (qmail 62789 invoked from network); 23 Mar 2011 22:24:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 23 Mar 2011 22:24:20 -0000 Received: (qmail 74298 invoked by uid 500); 23 Mar 2011 22:24:20 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 74267 invoked by uid 500); 23 Mar 2011 22:24:20 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 74260 invoked by uid 99); 23 Mar 2011 22:24:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Mar 2011 22:24:20 +0000 X-ASF-Spam-Status: No, hits=-1996.4 required=5.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Mar 2011 22:24:16 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BBD9D23888FD; Wed, 23 Mar 2011 22:23:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1084785 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Date: Wed, 23 Mar 2011 22:23:54 -0000 To: commits@hbase.apache.org From: jdcryans@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110323222354.BBD9D23888FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jdcryans Date: Wed Mar 23 22:23:54 2011 New Revision: 1084785 URL: http://svn.apache.org/viewvc?rev=1084785&view=rev Log: HBASE-3640 [replication] Transferring queues shouldn't be done inline with RS startup Modified: hbase/trunk/CHANGES.txt hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Modified: hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1084785&r1=1084784&r2=1084785&view=diff ============================================================================== --- hbase/trunk/CHANGES.txt (original) +++ hbase/trunk/CHANGES.txt Wed Mar 23 22:23:54 2011 @@ -182,6 +182,7 @@ Release 0.90.2 - Unreleased HBASE-3496 HFile CLI Improvements HBASE-3596 [replication] Wait a few seconds before transferring queues HBASE-3600 Update our jruby to 1.6.0 + HBASE-3640 [replication] Transferring queues shouldn't be done inline with RS startup Release 0.90.1 - February 9th, 2011 Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1084785&r1=1084784&r2=1084785&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Wed Mar 23 22:23:54 2011 @@ -27,8 +27,13 @@ import java.util.Map; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -78,6 +83,8 @@ public class ReplicationSourceManager { private final Path oldLogDir; // The number of ms that we wait before moving znodes, HBASE-3596 private final long sleepBeforeFailover; + // Homemade executer service for replication + private final ThreadPoolExecutor executor; /** * Creates a replication manager and sets the watch on all the other @@ -116,6 +123,17 @@ public class ReplicationSourceManager { new PeersWatcher(this.zkHelper.getZookeeperWatcher())); this.zkHelper.listPeersIdsAndWatch(); this.otherRegionServers = otherRSs == null ? new ArrayList() : otherRSs; + // It's preferable to failover 1 RS at a time, but with good zk servers + // more could be processed at the same time. + int nbWorkers = conf.getInt("replication.executor.workers", 1); + // use a short 100ms sleep since this could be done inline with a RS startup + // even if we fail, other region servers can take care of it + this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, + 100, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); + tfb.setNameFormat("ReplicationExecutor-%d"); + this.executor.setThreadFactory(tfb.build()); } /** @@ -199,6 +217,7 @@ public class ReplicationSourceManager { * Terminate the replication on this region server */ public void join() { + this.executor.shutdown(); if (this.sources.size() == 0) { this.zkHelper.deleteOwnRSZNode(); } @@ -298,50 +317,12 @@ public class ReplicationSourceManager { * @param rsZnode */ public void transferQueues(String rsZnode) { - // Wait a bit before transferring the queues, we may be shutting down. - // This sleep may not be enough in some cases. + NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode); try { - Thread.sleep(this.sleepBeforeFailover); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting before transferring a queue."); - Thread.currentThread().interrupt(); - } - // We try to lock that rs' queue directory - if (this.stopper.isStopped()) { - LOG.info("Not transferring queue since we are shutting down"); - return; - } - if (!this.zkHelper.lockOtherRS(rsZnode)) { - return; - } - LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); - SortedMap> newQueues = - this.zkHelper.copyQueuesFromRS(rsZnode); - this.zkHelper.deleteRsQueues(rsZnode); - if (newQueues == null || newQueues.size() == 0) { - return; - } - - for (Map.Entry> entry : newQueues.entrySet()) { - String peerId = entry.getKey(); - try { - ReplicationSourceInterface src = getReplicationSource(this.conf, - this.fs, this, this.stopper, this.replicating, peerId); - if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) { - src.terminate("Recovered queue doesn't belong to any current peer"); - break; - } - this.oldsources.add(src); - for (String hlog : entry.getValue()) { - src.enqueueLog(new Path(this.oldLogDir, hlog)); - } - // TODO set it to what's in ZK - src.setSourceEnabled(true); - src.startup(); - } catch (IOException e) { - // TODO manage it - LOG.error("Failed creating a source", e); - } + this.executor.execute(transfer); + } catch (RejectedExecutionException ex) { + LOG.info("Cancelling the transfer of " + rsZnode + + " because of " + ex.getMessage()); } } @@ -525,6 +506,73 @@ public class ReplicationSourceManager { } /** + * Class responsible to setup new ReplicationSources to take care of the + * queues from dead region servers. + */ + class NodeFailoverWorker extends Thread { + + private String rsZnode; + + /** + * + * @param rsZnode + */ + public NodeFailoverWorker(String rsZnode) { + super("Failover-for-"+rsZnode); + this.rsZnode = rsZnode; + } + + @Override + public void run() { + // Wait a bit before transferring the queues, we may be shutting down. + // This sleep may not be enough in some cases. + try { + Thread.sleep(sleepBeforeFailover); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting before transferring a queue."); + Thread.currentThread().interrupt(); + } + // We try to lock that rs' queue directory + if (stopper.isStopped()) { + LOG.info("Not transferring queue since we are shutting down"); + return; + } + if (!zkHelper.lockOtherRS(rsZnode)) { + return; + } + LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); + SortedMap> newQueues = + zkHelper.copyQueuesFromRS(rsZnode); + zkHelper.deleteRsQueues(rsZnode); + if (newQueues == null || newQueues.size() == 0) { + return; + } + + for (Map.Entry> entry : newQueues.entrySet()) { + String peerId = entry.getKey(); + try { + ReplicationSourceInterface src = getReplicationSource(conf, + fs, ReplicationSourceManager.this, stopper, replicating, peerId); + if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) { + src.terminate("Recovered queue doesn't belong to any current peer"); + break; + } + oldsources.add(src); + for (String hlog : entry.getValue()) { + src.enqueueLog(new Path(oldLogDir, hlog)); + } + // TODO set it to what's in ZK + src.setSourceEnabled(true); + src.startup(); + } catch (IOException e) { + // TODO manage it + LOG.error("Failed creating a source", e); + } + } + } + } + + /** * Get the directory where hlogs are archived * @return the directory where hlogs are archived */