Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B6129D222 for ; Wed, 7 Nov 2012 20:15:11 +0000 (UTC) Received: (qmail 26152 invoked by uid 500); 7 Nov 2012 20:15:11 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 26089 invoked by uid 500); 7 Nov 2012 20:15:11 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 26082 invoked by uid 99); 7 Nov 2012 20:15:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Nov 2012 20:15:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Nov 2012 20:15:08 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0A022238897F for ; Wed, 7 Nov 2012 20:14:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1406785 - /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Date: Wed, 07 Nov 2012 20:14:46 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121107201447.0A022238897F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: liyin Date: Wed Nov 7 20:14:46 2012 New Revision: 1406785 URL: http://svn.apache.org/viewvc?rev=1406785&view=rev Log: [HBASE-7100] Allow multiple connections from HBaseClient to each remote endpoint Author: kranganathan Summary: Allows a conf-controlled set of connections per server. With this, the get benchmark from one client to one server is now able to do 145K to 152K gets/sec. Note that the first param that the HBaseClient is created with is respected, all future confs are ignored. Test Plan: Tested by running the get benchmark. Reviewers: kannan, liyintang Reviewed By: kannan CC: hbase-eng@ Differential Revision: https://phabricator.fb.com/D621225 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1406785&r1=1406784&r2=1406785&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed Nov 7 20:14:46 2012 @@ -43,6 +43,7 @@ import java.net.UnknownHostException; import java.util.Hashtable; import java.util.Iterator; import java.util.Map.Entry; +import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -83,9 +84,9 @@ public class HBaseClient { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient"); // Active connections are stored in connections. - protected final ConcurrentMap connections = + protected final ConcurrentMap connections = new ConcurrentHashMap(); - + protected int counter; // counter for call ids protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs final protected Configuration conf; @@ -98,6 +99,12 @@ public class HBaseClient { protected int pingInterval; // how often sends ping to the server in msecs private final int connectionTimeOutMillSec; // the connection time out + private final int numConnectionsPerServer; + public static final String NUM_CONNECTIONS_PER_SERVER = + "hbase.client.max.connections.per.server"; + public static final int DEFAULT_NUM_CONNECTIONS_PER_SERVER = 1; + private Random random = new Random(); + protected final SocketFactory socketFactory; // how to create sockets final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; @@ -186,7 +193,7 @@ public class HBaseClient { return version; } } - + /** Thread that reads responses and notifies callers. Each connection owns a * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ @@ -785,6 +792,10 @@ public class HBaseClient { this.socketFactory = factory; this.connectionTimeOutMillSec = conf.getInt("hbase.client.connection.timeout.millsec", 5000); + this.numConnectionsPerServer = conf.getInt(NUM_CONNECTIONS_PER_SERVER, + DEFAULT_NUM_CONNECTIONS_PER_SERVER); + LOG.debug("Created a new HBaseClient with " + numConnectionsPerServer + + " connections per remote server."); } /** @@ -974,12 +985,15 @@ public class HBaseClient { } call.setVersion(version); Connection connection; + // if multiple connections are enabled per remote, get a random one + int connectionNum = (numConnectionsPerServer > 1)? + random.nextInt(numConnectionsPerServer):0; /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout, - call.getVersion()); + call.getVersion(), connectionNum); do { connection = connections.get(remoteId); if (connection == null) { @@ -1056,13 +1070,15 @@ public class HBaseClient { final UserGroupInformation ticket; final private int rpcTimeout; final private int version; + final private int connectionNum; ConnectionId(InetSocketAddress address, UserGroupInformation ticket, - int rpcTimeout, int version) { + int rpcTimeout, int version, int connectionNum) { this.address = address; this.ticket = ticket; this.rpcTimeout = rpcTimeout; this.version = version; + this.connectionNum = connectionNum; } InetSocketAddress getAddress() { @@ -1077,7 +1093,8 @@ public class HBaseClient { if (obj instanceof ConnectionId) { ConnectionId id = (ConnectionId) obj; return address.equals(id.address) && ticket == id.ticket && - rpcTimeout == id.rpcTimeout && version == id.version; + rpcTimeout == id.rpcTimeout && version == id.version && + connectionNum == id.connectionNum; //Note : ticket is a ref comparision. } return false; @@ -1086,7 +1103,7 @@ public class HBaseClient { @Override public int hashCode() { return address.hashCode() ^ System.identityHashCode(ticket) ^ - rpcTimeout ^ version; + rpcTimeout ^ version ^ connectionNum; } } }