hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [4/5] hbase git commit: HBASE-12529 Use ThreadLocalRandom for RandomQueueBalancer
Date Wed, 19 Nov 2014 17:31:30 GMT
HBASE-12529 Use ThreadLocalRandom for RandomQueueBalancer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/865ffebf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/865ffebf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/865ffebf

Branch: refs/heads/branch-1
Commit: 865ffebfc6989c79ff84a43ddb39df8067a28d8b
Parents: 6e376b9
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Wed Nov 19 16:50:29 2014 +0000
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Wed Nov 19 16:51:26 2014 +0000

----------------------------------------------------------------------
 .../hbase/ipc/BalancedQueueRpcExecutor.java     | 48 +-------------------
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    | 14 ++++--
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    | 44 ++++++++++++++++++
 3 files changed, 54 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/865ffebf/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 40ba9fe..2418cf7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
-import com.google.common.base.Preconditions;
-
 /**
  * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still
remains
  * efficient with a single queue via an inlinable queue balancing mechanism.
@@ -39,7 +37,7 @@ import com.google.common.base.Preconditions;
 public class BalancedQueueRpcExecutor extends RpcExecutor {
 
   protected final List<BlockingQueue<CallRunner>> queues;
-  private QueueBalancer balancer;
+  private final QueueBalancer balancer;
 
   public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
       final int maxQueueLength) {
@@ -80,48 +78,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
   public List<BlockingQueue<CallRunner>> getQueues() {
     return queues;
   }
-
-  private static abstract class QueueBalancer {
-    /**
-     * @return the index of the next queue to which a request should be inserted
-     */
-    public abstract int getNextQueue();
-  }
-
-  public static QueueBalancer getBalancer(int queueSize) {
-    Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least
1");
-    if (queueSize == 1) {
-      return ONE_QUEUE;
-    } else {
-      return new RandomQueueBalancer(queueSize);
-    }
-  }
-
-  /**
-   * All requests go to the first queue, at index 0
-   */
-  private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
-
-    @Override
-    public int getNextQueue() {
-      return 0;
-    }
-  };
-
-  /**
-   * Queue balancer that just randomly selects a queue in the range [0, num queues).
-   */
-  private static class RandomQueueBalancer extends QueueBalancer {
-    private int queueSize;
-    private Random random;
-
-    public RandomQueueBalancer(int queueSize) {
-      this.queueSize = queueSize;
-      this.random = new Random();
-    }
-
-    public int getNextQueue() {
-      return random.nextInt(queueSize);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/865ffebf/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 4f0448d..deac2f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -50,7 +49,9 @@ public class RWQueueRpcExecutor extends RpcExecutor {
   private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
 
   private final List<BlockingQueue<CallRunner>> queues;
-  private final Random balancer = new Random();
+  private final QueueBalancer writeBalancer;
+  private final QueueBalancer readBalancer;
+  private final QueueBalancer scanBalancer;
   private final int writeHandlersCount;
   private final int readHandlersCount;
   private final int scanHandlersCount;
@@ -115,6 +116,9 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     this.numWriteQueues = numWriteQueues;
     this.numReadQueues = numReadQueues;
     this.numScanQueues = numScanQueues;
+    this.writeBalancer = getBalancer(numWriteQueues);
+    this.readBalancer = getBalancer(numReadQueues);
+    this.scanBalancer = getBalancer(numScanQueues);
 
     queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
     LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
+
@@ -146,11 +150,11 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     RpcServer.Call call = callTask.getCall();
     int queueIndex;
     if (isWriteRequest(call.getHeader(), call.param)) {
-      queueIndex = balancer.nextInt(numWriteQueues);
+      queueIndex = writeBalancer.getNextQueue();
     } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param))
{
-      queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues);
+      queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
     } else {
-      queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
+      queueIndex = numWriteQueues + readBalancer.getNextQueue();
     }
     queues.get(queueIndex).put(callTask);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/865ffebf/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 5fe2590..1b0934c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -22,12 +22,14 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
 @InterfaceAudience.Private
@@ -124,4 +126,46 @@ public abstract class RpcExecutor {
       }
     }
   }
+
+  public static abstract class QueueBalancer {
+    /**
+     * @return the index of the next queue to which a request should be inserted
+     */
+    public abstract int getNextQueue();
+  }
+
+  public static QueueBalancer getBalancer(int queueSize) {
+    Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least
1");
+    if (queueSize == 1) {
+      return ONE_QUEUE;
+    } else {
+      return new RandomQueueBalancer(queueSize);
+    }
+  }
+
+  /**
+   * All requests go to the first queue, at index 0
+   */
+  private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
+
+    @Override
+    public int getNextQueue() {
+      return 0;
+    }
+  };
+
+  /**
+   * Queue balancer that just randomly selects a queue in the range [0, num queues).
+   */
+  private static class RandomQueueBalancer extends QueueBalancer {
+    private final int queueSize;
+
+    public RandomQueueBalancer(int queueSize) {
+      this.queueSize = queueSize;
+    }
+
+    public int getNextQueue() {
+      return ThreadLocalRandom.current().nextInt(queueSize);
+    }
+  }
 }


Mime
View raw message