hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-12028 Abort the RegionServer, when it's handler threads die (Alicia Ying Shu)
Date Fri, 02 Jan 2015 23:20:16 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.0 0256cdd1b -> f960f2a90


HBASE-12028 Abort the RegionServer, when it's handler threads die (Alicia Ying Shu)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f960f2a9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f960f2a9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f960f2a9

Branch: refs/heads/branch-1.0
Commit: f960f2a9062a4ab3bccdcd2718f001eed54c9d18
Parents: 0256cdd
Author: Enis Soztutar <enis@apache.org>
Authored: Fri Jan 2 13:07:57 2015 -0800
Committer: Enis Soztutar <enis@apache.org>
Committed: Fri Jan 2 14:23:49 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HConstants.java     |  11 ++
 .../src/main/resources/hbase-default.xml        |   8 +
 .../hbase/ipc/BalancedQueueRpcExecutor.java     |  19 +-
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |   5 +-
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    |  22 ++-
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    |  49 ++++-
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  32 +++-
 .../hbase/regionserver/RSRpcServices.java       |   2 +-
 .../hbase/regionserver/RpcSchedulerFactory.java |   7 +-
 .../regionserver/SimpleRpcSchedulerFactory.java |  31 +--
 .../hbase/ipc/TestRpcHandlerException.java      | 189 +++++++++++++++++++
 11 files changed, 340 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 7700812..0ac3fbc 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -859,6 +859,17 @@ public final class HConstants {
   public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
   public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30;
 
+  /*
+   * REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT:
+   * -1  => Disable aborting
+   * 0   => Abort if even a single handler has died
+   * 0.x => Abort only when this percent of handlers have died
+   * 1   => Abort only all of the handers have died
+   */
+  public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT =
+		  "hbase.regionserver.handler.abort.on.error.percent";
+  public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = 0.5;
+
   public static final String REGION_SERVER_META_HANDLER_COUNT =
       "hbase.regionserver.metahandler.count";
   public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 8590da3..04facc0 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1311,4 +1311,12 @@ possible configurations would overwhelm and obscure the important.
     <name>hbase.http.staticuser.user</name>
     <value>dr.stack</value>
   </property>
+  <property>
+    <name>hbase.regionserver.handler.abort.on.error.percent</name>
+    <value>0.5</value>
+    <description>The percent of region server RPC threads failed to abort RS.
+    -1 Disable aborting; 0 Abort if even a single handler has died;
+    0.x Abort only when this percent of handlers have died;
+    1 Abort only all of the handers have died.</description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 3ba842b..56424df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -22,9 +22,11 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
@@ -40,12 +42,23 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
 
   public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
       final int maxQueueLength) {
-    this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
+    this(name, handlerCount, numQueues, maxQueueLength, null, null);
+  }
+
+  public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final int maxQueueLength, final Configuration conf, final Abortable abortable) {
+    this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength);
+  }
+
+  public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+    this(name, handlerCount, numQueues, null, null,  queueClass, initargs);
   }
 
   public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final Configuration conf, final Abortable abortable,
       final Class<? extends BlockingQueue> queueClass, Object... initargs) {
-    super(name, Math.max(handlerCount, numQueues));
+    super(name, Math.max(handlerCount, numQueues), conf, abortable);
     queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
     this.balancer = getBalancer(numQueues);
     initializeQueues(numQueues, queueClass, initargs);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index b89a5d2..56bd96b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.ipc;
  */
 import java.nio.channels.ClosedChannelException;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -111,6 +111,9 @@ public class CallRunner {
         RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(),
e);
         errorThrowable = e;
         error = StringUtils.stringifyException(e);
+        if (e instanceof Error) {
+          throw (Error)e;
+        }
       } finally {
         if (traceScope != null) {
           traceScope.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index deac2f8..2b58680 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -26,9 +26,11 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
@@ -60,25 +62,35 @@ public class RWQueueRpcExecutor extends RpcExecutor {
   private final int numScanQueues;
 
   public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
-      final float readShare, final int maxQueueLength) {
-    this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class);
+      final float readShare, final int maxQueueLength,
+      final Configuration conf, final Abortable abortable) {
+    this(name, handlerCount, numQueues, readShare, maxQueueLength, 0,
+      conf, abortable, LinkedBlockingQueue.class);
   }
 
   public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
       final float readShare, final float scanShare, final int maxQueueLength) {
+    this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
+  }
+
+  public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final float readShare, final float scanShare, final int maxQueueLength,
+      final Configuration conf, final Abortable abortable) {
     this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
-      LinkedBlockingQueue.class);
+      conf, abortable, LinkedBlockingQueue.class);
   }
 
   public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
       final float readShare, final int maxQueueLength,
+      final Configuration conf, final Abortable abortable,
       final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs)
{
-    this(name, handlerCount, numQueues, readShare, 0, maxQueueLength,
+    this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable,
       readQueueClass, readQueueInitArgs);
   }
 
   public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
       final float readShare, final float scanShare, final int maxQueueLength,
+      final Configuration conf, final Abortable abortable,
       final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs)
{
     this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
       calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,

http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 1b0934c..709429d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -21,13 +21,17 @@ package org.apache.hadoop.hbase.ipc;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -41,15 +45,26 @@ public abstract class RpcExecutor {
   private final List<Thread> handlers;
   private final int handlerCount;
   private final String name;
+  private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
 
   private boolean running;
 
+  private Configuration conf = null;
+  private Abortable abortable = null;
+
   public RpcExecutor(final String name, final int handlerCount) {
     this.handlers = new ArrayList<Thread>(handlerCount);
     this.handlerCount = handlerCount;
     this.name = Strings.nullToEmpty(name);
   }
 
+  public RpcExecutor(final String name, final int handlerCount, final Configuration conf,
+      final Abortable abortable) {
+    this(name, handlerCount);
+    this.conf = conf;
+    this.abortable = abortable;
+  }
+
   public void start(final int port) {
     running = true;
     startHandlers(port);
@@ -94,7 +109,7 @@ public abstract class RpcExecutor {
       });
       t.setDaemon(true);
       t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
-          ",queue=" + index + ",port=" + port);
+        ",queue=" + index + ",port=" + port);
       t.start();
       LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
       handlers.add(t);
@@ -103,6 +118,9 @@ public abstract class RpcExecutor {
 
   protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
     boolean interrupted = false;
+    double handlerFailureThreshhold =
+        conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
+          HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
     try {
       while (running) {
         try {
@@ -110,9 +128,30 @@ public abstract class RpcExecutor {
           try {
             activeHandlerCount.incrementAndGet();
             task.run();
-          } catch (Throwable t) {
-            LOG.error("RpcServer handler thread throws exception: ", t);
-            throw t;
+          } catch (Throwable e) {
+            if (e instanceof Error) {
+              int failedCount = failedHandlerCount.incrementAndGet();
+              if (handlerFailureThreshhold >= 0
+                  && failedCount > handlerCount * handlerFailureThreshhold) {
+                String message =
+                    "Number of failed RpcServer handler exceeded threshhold "
+                        + handlerFailureThreshhold + "  with failed reason: "
+                        + StringUtils.stringifyException(e);
+                if (abortable != null) {
+                  abortable.abort(message, e);
+                } else {
+                  LOG.error("Received " + StringUtils.stringifyException(e)
+                    + " but not aborting due to abortable being null");
+                  throw e;
+                }
+              } else {
+                LOG.warn("RpcServer handler threads encountered errors "
+                    + StringUtils.stringifyException(e));
+              }
+            } else {
+              LOG.warn("RpcServer handler threads encountered exceptions "
+                  + StringUtils.stringifyException(e));
+            }
           } finally {
             activeHandlerCount.decrementAndGet();
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index dd66570..cbe8adc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -22,11 +22,12 @@ import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
 
 /**
@@ -93,6 +94,8 @@ public class SimpleRpcScheduler extends RpcScheduler {
   /** What level a high priority call is at. */
   private final int highPriorityLevel;
 
+  private Abortable abortable = null;
+
   /**
    * @param conf
    * @param handlerCount the number of handler threads that will be used to process calls
@@ -107,11 +110,13 @@ public class SimpleRpcScheduler extends RpcScheduler {
       int priorityHandlerCount,
       int replicationHandlerCount,
       PriorityFunction priority,
+      Abortable server,
       int highPriorityLevel) {
     int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
         handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
     this.priority = priority;
     this.highPriorityLevel = highPriorityLevel;
+    this.abortable = server;
 
     String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
     float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
@@ -127,30 +132,41 @@ public class SimpleRpcScheduler extends RpcScheduler {
       if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
         callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
-            callqReadShare, callqScanShare, maxQueueLength,
+            callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
             BoundedPriorityBlockingQueue.class, callPriority);
       } else {
         callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
-            callqReadShare, callqScanShare, maxQueueLength);
+          callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
       }
     } else {
       // multiple queues
       if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
         callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
-            BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+          conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
       } else {
         callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
-            numCallQueues, maxQueueLength);
+            numCallQueues, maxQueueLength, conf, abortable);
       }
     }
 
    this.priorityExecutor =
      priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
-       1, maxQueueLength) : null;
+       1, maxQueueLength, conf, abortable) : null;
    this.replicationExecutor =
      replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
-       replicationHandlerCount, 1, maxQueueLength) : null;
+       replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
+  }
+
+  public SimpleRpcScheduler(
+	      Configuration conf,
+	      int handlerCount,
+	      int priorityHandlerCount,
+	      int replicationHandlerCount,
+	      PriorityFunction priority,
+	      int highPriorityLevel) {
+	  this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority,
+	    null, highPriorityLevel);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 06e51c6..015c147 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -788,7 +788,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     rpcServer = new RpcServer(rs, name, getServices(),
       initialIsa, // BindAddress is IP we got for this server.
       rs.conf,
-      rpcSchedulerFactory.create(rs.conf, this));
+      rpcSchedulerFactory.create(rs.conf, this, rs));
 
     scannerLeaseTimeoutPeriod = rs.conf.getInt(
       HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,

http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
index 9e55f1f..f554781 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
@@ -18,9 +18,10 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.ipc.RpcScheduler;
 
@@ -34,5 +35,9 @@ public interface RpcSchedulerFactory {
   /**
    * Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}.
    */
+  RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server);
+
+  @Deprecated
   RpcScheduler create(Configuration conf, PriorityFunction priority);
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
index e1dba74..b044a43 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
@@ -17,11 +17,12 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.ipc.RpcScheduler;
 import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
@@ -32,17 +33,25 @@ import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
 public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
 
   @Override
+  @Deprecated
   public RpcScheduler create(Configuration conf, PriorityFunction priority) {
+	  return create(conf, priority, null);
+  }
+
+  @Override
+  public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server)
{
     int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
-        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+		HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
     return new SimpleRpcScheduler(
-        conf,
-        handlerCount,
-        conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
-            HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
-        conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
-            HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
-        priority,
-        HConstants.QOS_THRESHOLD);
+      conf,
+      handlerCount,
+      conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
+        HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
+      conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
+          HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
+      priority,
+      server,
+      HConstants.QOS_THRESHOLD);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f960f2a9/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
new file mode 100644
index 0000000..9cb1cc5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+@Category({SmallTests.class})
+public class TestRpcHandlerException {
+  public static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class);
+  static String example = "xyz";
+  static byte[] CELL_BYTES = example.getBytes();
+  static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
+
+  private final static Configuration CONF = HBaseConfiguration.create();
+  RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class);
+
+  // We are using the test TestRpcServiceProtos generated classes and Service because they
are
+  // available and basic with methods like 'echo', and ping. Below we make a blocking service
+  // by passing in implementation of blocking interface. We use this service in all tests
that
+  // follow.
+  private static final BlockingService SERVICE =
+      TestRpcServiceProtos.TestProtobufRpcProto
+      .newReflectiveBlockingService(new TestRpcServiceProtos
+		  .TestProtobufRpcProto.BlockingInterface() {
+
+        @Override
+        public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
+            throws ServiceException {
+          return null;
+        }
+
+        @Override
+        public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
+            throws ServiceException {
+          return null;
+        }
+
+        @Override
+        public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
+            throws Error, RuntimeException {
+          if (controller instanceof PayloadCarryingRpcController) {
+            PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
+            // If cells, scan them to check we are able to iterate what we were given and
since
+            // this is
+            // an echo, just put them back on the controller creating a new block. Tests
our
+            // block
+            // building.
+            CellScanner cellScanner = pcrc.cellScanner();
+            List<Cell> list = null;
+            if (cellScanner != null) {
+		list = new ArrayList<Cell>();
+		try {
+			while (cellScanner.advance()) {
+				list.add(cellScanner.current());
+				throw new StackOverflowError();
+			}
+		} catch (StackOverflowError e) {
+			throw e;
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+            }
+            cellScanner = CellUtil.createCellScanner(list);
+            ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
+          }
+          return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
+        }
+      });
+
+  /**
+   * Instance of server. We actually don't do anything speical in here so could just use
+   * HBaseRpcServer directly.
+   */
+  private static class TestRpcServer extends RpcServer {
+
+    TestRpcServer() throws IOException {
+      this(new FifoRpcScheduler(CONF, 1));
+    }
+
+    TestRpcServer(RpcScheduler scheduler) throws IOException {
+      super(null, "testRpcServer",
+		  Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
+		  new InetSocketAddress("localhost", 0), CONF, scheduler);
+    }
+
+    @Override
+    public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor
md,
+      Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
+          throws IOException {
+      return super.call(service, md, param, cellScanner, receiveTime, status);
+    }
+  }
+
+  /** Tests that the rpc scheduler is called when requests arrive.
+   *  When Rpc handler thread dies, the client will hang and the test will fail.
+   *  The test is meant to be a unit test to test the behavior.
+   *
+   * */
+  private class AbortServer implements Abortable {
+    private boolean aborted = false;
+
+    @Override
+    public void abort(String why, Throwable e) {
+      aborted = true;
+    }
+
+    @Override
+    public boolean isAborted() {
+      return aborted;
+    }
+  }
+
+  /* This is a unit test to make sure to abort region server when the number of Rpc handler
thread
+   * caught errors exceeds the threshold. Client will hang when RS aborts.
+   */
+  @Ignore
+  @Test
+  public void testRpcScheduler() throws IOException, InterruptedException {
+    PriorityFunction qosFunction = mock(PriorityFunction.class);
+    Abortable abortable = new AbortServer();
+    RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable,
0);
+    RpcServer rpcServer = new TestRpcServer(scheduler);
+    RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
+    try {
+      rpcServer.start();
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+      client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), md
+        .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
+    } catch (Throwable e) {
+      assert(abortable.isAborted() == true);
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+}


Mime
View raw message