hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject svn commit: r1304635 - /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Date Fri, 23 Mar 2012 22:32:47 GMT
Author: jdcryans
Date: Fri Mar 23 22:32:46 2012
New Revision: 1304635

URL: http://svn.apache.org/viewvc?rev=1304635&view=rev
Log:
HBASE-5190  Limit the IPC queue size based on calls' payload size

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1304635&r1=1304634&r2=1304635&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri Mar
23 22:32:46 2012
@@ -75,6 +75,8 @@ import org.apache.hadoop.util.StringUtil
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import org.cliffc.high_scale_lib.Counter;
+
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.
@@ -95,7 +97,13 @@ public abstract class HBaseServer implem
   /**
    * How many calls/handler are allowed in the queue.
    */
-  private static final int DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER = 10;
+  private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
+
+  /**
+   * The maximum size that we can hold in the IPC queue
+   */
+  private static final int DEFAULT_MAX_CALLQUEUE_SIZE =
+    1024 * 1024 * 1024;
 
   static final int BUFFER_INITIAL_SIZE = 1024;
 
@@ -191,6 +199,7 @@ public abstract class HBaseServer implem
 
   protected Configuration conf;
 
+  private int maxQueueLength;
   private int maxQueueSize;
   protected int socketSendBufferSize;
   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
@@ -199,6 +208,7 @@ public abstract class HBaseServer implem
 
   volatile protected boolean running = true;         // true while server runs
   protected BlockingQueue<Call> callQueue; // queued calls
+  protected final Counter callQueueSize = new Counter();
   protected BlockingQueue<Call> priorityCallQueue;
 
   protected int highPriorityLevel;  // what level a high priority call is at
@@ -257,10 +267,11 @@ public abstract class HBaseServer implem
     protected Responder responder;
     protected boolean delayReturnValue;           // if the return value should be
                                                   // set at call completion
+    protected long size;                          // size of current call
     protected boolean isError;
 
     public Call(int id, Writable param, Connection connection,
-        Responder responder) {
+        Responder responder, long size) {
       this.id = id;
       this.param = param;
       this.connection = connection;
@@ -269,6 +280,7 @@ public abstract class HBaseServer implem
       this.delayResponse = false;
       this.responder = responder;
       this.isError = false;
+      this.size = size;
     }
 
     @Override
@@ -401,6 +413,10 @@ public abstract class HBaseServer implem
       return this.delayReturnValue;
     }
 
+    public long getSize() {
+      return this.size;
+    }
+
     /**
      * If we have a response, and delay is not set, then respond
      * immediately.  Otherwise, do not respond to client.  This is
@@ -1197,7 +1213,7 @@ public abstract class HBaseServer implem
         // we return 0 which will keep the socket up -- bad clients, unless
         // they switch to suit the running server -- will fail later doing
         // getProtocolVersion.
-        Call fakeCall =  new Call(0, null, this, responder);
+        Call fakeCall =  new Call(0, null, this, responder, 0);
         // Versions 3 and greater can interpret this exception
         // response in the same manner
         setupResponse(buffer, fakeCall, Status.FATAL,
@@ -1229,9 +1245,23 @@ public abstract class HBaseServer implem
       DataInputStream dis =
         new DataInputStream(new ByteArrayInputStream(buf));
       int id = dis.readInt();                    // try to read an id
+      long callSize = buf.length;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(" got call #" + id + ", " + callSize + " bytes");
+      }
 
-      if (LOG.isDebugEnabled())
-        LOG.debug(" got call #" + id + ", " + buf.length + " bytes");
+      // Enforcing the call queue size, this triggers a retry in the client
+      if ((callSize + callQueueSize.get()) > maxQueueSize) {
+        final Call callTooBig =
+          new Call(id, null, this, responder, callSize);
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+        setupResponse(responseBuffer, callTooBig, Status.FATAL, null,
+            IOException.class.getName(),
+            "Call queue is full, is ipc.server.max.callqueue.size too small?");
+        responder.doRespond(callTooBig);
+        return;
+      }
 
       Writable param;
       try {
@@ -1240,7 +1270,8 @@ public abstract class HBaseServer implem
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
                  getHostAddress(), t);
-        final Call readParamsFailedCall = new Call(id, null, this, responder);
+        final Call readParamsFailedCall =
+          new Call(id, null, this, responder, callSize);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
         setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
@@ -1249,7 +1280,8 @@ public abstract class HBaseServer implem
         responder.doRespond(readParamsFailedCall);
         return;
       }
-      Call call = new Call(id, param, this, responder);
+      Call call = new Call(id, param, this, responder, callSize);
+      callQueueSize.add(callSize);
 
       if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel)
{
         priorityCallQueue.put(call);
@@ -1353,7 +1385,7 @@ public abstract class HBaseServer implem
             RequestContext.clear();
           }
           CurCall.set(null);
-
+          callQueueSize.add(call.getSize() * -1);
           // Set the response for undelayed calls and delayed calls with
           // undelayed responses.
           if (!call.isDelayed() || !call.isReturnValueDelayed()) {
@@ -1436,15 +1468,29 @@ public abstract class HBaseServer implem
     this.handlerCount = handlerCount;
     this.priorityHandlerCount = priorityHandlerCount;
     this.socketSendBufferSize = 0;
+
+    // temporary backward compatibility
+    String oldMaxQueueSize = this.conf.get("ipc.server.max.queue.size");
+    if (oldMaxQueueSize == null) {
+      this.maxQueueLength =
+        this.conf.getInt("ipc.server.max.callqueue.length",
+          handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+    } else {
+      LOG.warn("ipc.server.max.queue.size was renamed " +
+               "ipc.server.max.callqueue.length, " +
+               "please update your configuration");
+      this.maxQueueLength = Integer.getInteger(oldMaxQueueSize);
+    }
+
     this.maxQueueSize =
-      this.conf.getInt("ipc.server.max.queue.size",
-        handlerCount * DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER);
+      this.conf.getInt("ipc.server.max.callqueue.size",
+        DEFAULT_MAX_CALLQUEUE_SIZE);
      this.readThreads = conf.getInt(
         "ipc.server.read.threadpool.size",
         10);
-    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize);
+    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueLength);
     if (priorityHandlerCount > 0) {
-      this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueSize); // TODO
hack on size
+      this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueLength); // TODO
hack on size
     } else {
       this.priorityCallQueue = null;
     }



Mime
View raw message