hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1570219 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver: HRegionServer.java SplitLogWorker.java
Date Thu, 20 Feb 2014 15:16:46 GMT
Author: nkeywal
Date: Thu Feb 20 15:16:46 2014
New Revision: 1570219

URL: http://svn.apache.org/r1570219
Log:
HBASE-10524 Correct wrong handling and add proper handling for swallowed InterruptedException
thrown by Thread.sleep in regionserver (Feng Honghua)

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1570219&r1=1570218&r2=1570219&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Thu Feb 20 15:16:46 2014
@@ -1939,45 +1939,54 @@ public class HRegionServer implements Cl
     RegionServerStatusService.BlockingInterface master = null;
     boolean refresh = false; // for the first time, use cached data
     RegionServerStatusService.BlockingInterface intf = null;
-    while (keepLooping() && master == null) {
-      sn = this.masterAddressManager.getMasterAddress(refresh);
-      if (sn == null) {
-        if (!keepLooping()) {
-          // give up with no connection.
-          LOG.debug("No master found and cluster is stopped; bailing out");
-          return null;
-        }
-        LOG.debug("No master found; retry");
-        previousLogTime = System.currentTimeMillis();
-        refresh = true; // let's try pull it from ZK directly
-        sleeper.sleep();
-        continue;
-      }
+    boolean interrupted = false;
+    try {
+      while (keepLooping() && master == null) {
+        sn = this.masterAddressManager.getMasterAddress(refresh);
+        if (sn == null) {
+          if (!keepLooping()) {
+            // give up with no connection.
+            LOG.debug("No master found and cluster is stopped; bailing out");
+            return null;
+          }
+          LOG.debug("No master found; retry");
+          previousLogTime = System.currentTimeMillis();
+          refresh = true; // let's try pull it from ZK directly
+          sleeper.sleep();
+          continue;
+        }
 
-      new InetSocketAddress(sn.getHostname(), sn.getPort());
-      try {
-        BlockingRpcChannel channel =
+        new InetSocketAddress(sn.getHostname(), sn.getPort());
+        try {
+          BlockingRpcChannel channel =
             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), this.rpcTimeout);
-        intf = RegionServerStatusService.newBlockingStub(channel);
-        break;
-      } catch (IOException e) {
-        e = e instanceof RemoteException ?
+          intf = RegionServerStatusService.newBlockingStub(channel);
+          break;
+        } catch (IOException e) {
+          e = e instanceof RemoteException ?
             ((RemoteException)e).unwrapRemoteException() : e;
-        if (e instanceof ServerNotRunningYetException) {
-          if (System.currentTimeMillis() > (previousLogTime+1000)){
-            LOG.info("Master isn't available yet, retrying");
-            previousLogTime = System.currentTimeMillis();
+          if (e instanceof ServerNotRunningYetException) {
+            if (System.currentTimeMillis() > (previousLogTime+1000)){
+              LOG.info("Master isn't available yet, retrying");
+              previousLogTime = System.currentTimeMillis();
+            }
+          } else {
+            if (System.currentTimeMillis() > (previousLogTime + 1000)) {
+              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
+              previousLogTime = System.currentTimeMillis();
+            }
           }
-        } else {
-          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
-            LOG.warn("Unable to connect to master. Retrying. Error was:", e);
-            previousLogTime = System.currentTimeMillis();
+          try {
+            Thread.sleep(200);
+          } catch (InterruptedException ex) {
+            interrupted = true;
+            LOG.warn("Interrupted while sleeping");
           }
         }
-        try {
-          Thread.sleep(200);
-        } catch (InterruptedException ignored) {
-        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
       }
     }
     return new Pair<ServerName, RegionServerStatusService.BlockingInterface>(sn, intf);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1570219&r1=1570218&r2=1570219&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
Thu Feb 20 15:16:46 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -190,26 +191,23 @@ public class SplitLogWorker extends ZooK
           LOG.warn("Exception when checking for " + watcher.splitLogZNode  + " ... retrying",
e);
         }
         if (res == -1) {
-          try {
-            LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to
create");
-            Thread.sleep(1000);
-          } catch (InterruptedException e) {
-            LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode
-                + (exitWorker ? "" : " (ERROR: exitWorker is not set, " +
-                "exiting anyway)"));
-            exitWorker = true;
-            break;
-          }
+          LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to
create");
+          Thread.sleep(1000);
         }
       }
 
       if (!exitWorker) {
-        taskLoop();
+          taskLoop();
       }
     } catch (Throwable t) {
-      // only a logical error can cause here. Printing it out
-      // to make debugging easier
-      LOG.error("unexpected error ", t);
+      if (ExceptionUtil.isInterrupt(t)) {
+        LOG.info("SplitLogWorker interrupted. Exiting. " + (exitWorker ? "" :
+            " (ERROR: exitWorker is not set, exiting anyway)"));
+      } else {
+        // only a logical error can cause here. Printing it out
+        // to make debugging easier
+        LOG.error("unexpected error ", t);
+      }
     } finally {
       LOG.info("SplitLogWorker " + this.serverName + " exiting");
     }
@@ -223,7 +221,7 @@ public class SplitLogWorker extends ZooK
    * Synchronization using {@link #taskReadyLock} ensures that it will
    * try to grab every task that has been put up
    */
-  private void taskLoop() {
+  private void taskLoop() throws InterruptedException {
     while (!exitWorker) {
       int seq_start = taskReadySeq;
       List<String> paths = getTaskList();
@@ -259,50 +257,41 @@ public class SplitLogWorker extends ZooK
       SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
       synchronized (taskReadyLock) {
         while (seq_start == taskReadySeq) {
-          try {
-            taskReadyLock.wait(checkInterval);
-            if (this.server != null) {
-              // check to see if we have stale recovering regions in our internal memory
state
-              Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
-              if (!recoveringRegions.isEmpty()) {
-                // Make a local copy to prevent ConcurrentModificationException when other
threads
-                // modify recoveringRegions
-                List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
-                for (String region : tmpCopy) {
-                  String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode,
region);
-                  try {
-                    if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
-                      HRegion r = recoveringRegions.remove(region);
-                      if (r != null) {
-                        r.setRecovering(false);
-                      }
-                      LOG.debug("Mark recovering region:" + region + " up.");
-                    } else {
-                      // current check is a defensive(or redundant) mechanism to prevent
us from
-                      // having stale recovering regions in our internal RS memory state
while
-                      // zookeeper(source of truth) says differently. We stop at the first
good one
-                      // because we should not have a single instance such as this in normal
case so
-                      // check the first one is good enough.
-                      break;
+          taskReadyLock.wait(checkInterval);
+          if (this.server != null) {
+            // check to see if we have stale recovering regions in our internal memory state
+            Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
+            if (!recoveringRegions.isEmpty()) {
+              // Make a local copy to prevent ConcurrentModificationException when other
threads
+              // modify recoveringRegions
+              List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
+              for (String region : tmpCopy) {
+                String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
+                try {
+                  if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
+                    HRegion r = recoveringRegions.remove(region);
+                    if (r != null) {
+                      r.setRecovering(false);
                     }
-                  } catch (KeeperException e) {
-                    // ignore zookeeper error
-                    LOG.debug("Got a zookeeper when trying to open a recovering region",
e);
+                    LOG.debug("Mark recovering region:" + region + " up.");
+                  } else {
+                    // current check is a defensive(or redundant) mechanism to prevent us
from
+                    // having stale recovering regions in our internal RS memory state while
+                    // zookeeper(source of truth) says differently. We stop at the first
good one
+                    // because we should not have a single instance such as this in normal
case so
+                    // check the first one is good enough.
                     break;
                   }
+                } catch (KeeperException e) {
+                  // ignore zookeeper error
+                  LOG.debug("Got a zookeeper when trying to open a recovering region", e);
+                  break;
                 }
               }
             }
-          } catch (InterruptedException e) {
-            LOG.info("SplitLogWorker interrupted while waiting for task," +
-                " exiting: " + e.toString() + (exitWorker ? "" :
-                " (ERROR: exitWorker is not set, exiting anyway)"));
-            exitWorker = true;
-            return;
           }
         }
       }
-
     }
   }
 
@@ -559,7 +548,7 @@ public class SplitLogWorker extends ZooK
   }
 
 
-  private List<String> getTaskList() {
+  private List<String> getTaskList() throws InterruptedException {
     List<String> childrenPaths = null;
     long sleepTime = 1000;
     // It will be in loop till it gets the list of children or
@@ -575,14 +564,9 @@ public class SplitLogWorker extends ZooK
         LOG.warn("Could not get children of znode "
             + this.watcher.splitLogZNode, e);
       }
-      try {
         LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
             + " after sleep for " + sleepTime + "ms!");
         Thread.sleep(sleepTime);
-      } catch (InterruptedException e1) {
-        LOG.warn("Interrupted while trying to get task list ...", e1);
-        Thread.currentThread().interrupt();
-      }
     }
     return childrenPaths;
   }



Mime
View raw message