hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject svn commit: r1171285 - in /hbase/branches/0.90: CHANGES.txt src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Date Thu, 15 Sep 2011 21:48:49 GMT
Author: jdcryans
Date: Thu Sep 15 21:48:49 2011
New Revision: 1171285

URL: http://svn.apache.org/viewvc?rev=1171285&view=rev
Log:
   HBASE-4363  [replication] ReplicationSource won't close if failing
               to contact the sink (JD and Lars Hofhansl)

Modified:
    hbase/branches/0.90/CHANGES.txt
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1171285&r1=1171284&r2=1171285&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Thu Sep 15 21:48:49 2011
@@ -40,6 +40,8 @@ Release 0.90.5 - Unreleased
    HBASE-4351  If from Admin we try to unassign a region forcefully, though a
                valid region name is given the master is not able to identify the region
                to unassign (Ramkrishna)
+   HBASE-4363  [replication] ReplicationSource won't close if failing
+               to contact the sink (JD and Lars Hofhansl)
 
   IMPROVEMENT
    HBASE-4205  Enhance HTable javadoc (Eric Charles)

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1171285&r1=1171284&r2=1171285&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
(original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Thu Sep 15 21:48:49 2011
@@ -230,7 +230,7 @@ public class ReplicationSource extends T
   public void run() {
     connectToPeers();
     // We were stopped while looping to connect to sinks, just abort
-    if (this.stopper.isStopped()) {
+    if (!this.isActive()) {
       return;
     }
     // If this is recovered, the queue is already full and the first log
@@ -246,7 +246,7 @@ public class ReplicationSource extends T
     }
     int sleepMultiplier = 1;
     // Loop until we close down
-    while (!stopper.isStopped() && this.running) {
+    while (isActive()) {
       // Sleep until replication is enabled again
       if (!this.replicating.get() || !this.sourceEnabled.get()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@@ -329,7 +329,7 @@ public class ReplicationSource extends T
       // If we didn't get anything to replicate, or if we hit a IOE,
       // wait a bit and retry.
       // But if we need to stop, don't bother sleeping
-      if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) {
+      if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
         this.manager.logPositionAndCleanOldLogs(this.currentPath,
             this.peerClusterZnode, this.position, queueRecovered);
         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@@ -393,7 +393,8 @@ public class ReplicationSource extends T
 
   private void connectToPeers() {
     // Connect to peer cluster first, unless we have to stop
-    while (!this.stopper.isStopped() && this.currentPeers.size() == 0) {
+    while (this.isActive() && this.currentPeers.size() == 0) {
+
       try {
         chooseSinks();
         Thread.sleep(this.sleepForRetries);
@@ -552,7 +553,7 @@ public class ReplicationSource extends T
       LOG.warn("Was given 0 edits to ship");
       return;
     }
-    while (!this.stopper.isStopped()) {
+    while (this.isActive()) {
       try {
         HRegionInterface rrs = getRS();
         LOG.debug("Replicating " + currentNbEntries);
@@ -579,6 +580,7 @@ public class ReplicationSource extends T
         }
         try {
           boolean down;
+          // Spin while the slave is down and we're not asked to shutdown/close
           do {
             down = isSlaveDown();
             if (down) {
@@ -588,7 +590,7 @@ public class ReplicationSource extends T
                 chooseSinks();
               }
             }
-          } while (!this.stopper.isStopped() && down);
+          } while (this.isActive() && down );
         } catch (InterruptedException e) {
           LOG.debug("Interrupted while trying to contact the peer cluster");
         } catch (KeeperException e) {
@@ -708,6 +710,10 @@ public class ReplicationSource extends T
     this.sourceEnabled.set(status);
   }
 
+  private boolean isActive() {
+    return !this.stopper.isStopped() && this.running;
+  }
+
   /**
    * Comparator used to compare logs together based on their start time
    */



Mime
View raw message