hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jg...@apache.org
Subject svn commit: r1029776 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/regionserver/
Date Mon, 01 Nov 2010 18:01:57 GMT
Author: jgray
Date: Mon Nov  1 18:01:57 2010
New Revision: 1029776

URL: http://svn.apache.org/viewvc?rev=1029776&view=rev
Log:
HBASE-3154  HBase RPC should support timeout (Hairong via jgray)

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Nov  1 18:01:57 2010
@@ -1079,6 +1079,7 @@ Release 0.21.0 - Unreleased
                cacheBlocks=true
    HBASE-3126  Force use of 'mv -f' when moving aside hbase logfiles
    HBASE-3176  Remove compile warnings in HRegionServer
+   HBASE-3154  HBase RPC should support timeout (Hairong via jgray)
 
 
   NEW FEATURES

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Mon Nov  1 18:01:57
2010
@@ -365,6 +365,16 @@ public final class HConstants {
    * Default value of {@link #HBASE_REGIONSERVER_LEASE_PERIOD_KEY}.
    */
   public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000;
+  
+  /**
+   * timeout for each RPC
+   */
+  public static String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
+  
+  /**
+   * Default value of {@link #HBASE_RPC_TIMEOUT_KEY}
+   */
+  public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000;
 
   public static final String
       REPLICATION_ENABLE_KEY = "hbase.replication";

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon Nov
 1 18:01:57 2010
@@ -224,7 +224,7 @@ public class HConnectionManager {
     private final long pause;
     private final int numRetries;
     private final int maxRPCAttempts;
-    private final long rpcTimeout;
+    private final int rpcTimeout;
     private final int prefetchRegionLimit;
 
     private final Object masterLock = new Object();
@@ -282,9 +282,9 @@ public class HConnectionManager {
       this.pause = conf.getLong("hbase.client.pause", 1000);
       this.numRetries = conf.getInt("hbase.client.retries.number", 10);
       this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
-      this.rpcTimeout = conf.getLong(
-          HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
-          HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
+      this.rpcTimeout = conf.getInt(
+          HConstants.HBASE_RPC_TIMEOUT_KEY,
+          HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
 
       this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
           10);
@@ -341,7 +341,7 @@ public class HConnectionManager {
 
             HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
                 HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
-                masterLocation.getInetSocketAddress(), this.conf);
+                masterLocation.getInetSocketAddress(), this.conf, this.rpcTimeout);
 
             if (tryMaster.isMasterRunning()) {
               this.master = tryMaster;
@@ -936,7 +936,7 @@ public class HConnectionManager {
             server = (HRegionInterface)HBaseRPC.waitForProxy(
                 serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
                 regionServer.getInetSocketAddress(), this.conf,
-                this.maxRPCAttempts, this.rpcTimeout);
+                this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
           } catch (RemoteException e) {
             LOG.warn("Remove exception connecting to RS", e);
             throw RemoteExceptionHandler.decodeRemoteException(e);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Mon Nov  1 18:01:57
2010
@@ -79,7 +79,7 @@ public class HBaseClient {
   final protected long failureSleep; // Time to sleep before retry on failure.
   protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
-  protected final int pingInterval; // how often sends ping to the server in msecs
+  protected int pingInterval; // how often sends ping to the server in msecs
 
   protected final SocketFactory socketFactory;           // how to create sockets
   private int refCount = 1;
@@ -194,7 +194,7 @@ public class HBaseClient {
     private IOException closeException; // close reason
 
     public Connection(InetSocketAddress address) throws IOException {
-      this(new ConnectionId(address, null));
+      this(new ConnectionId(address, null, 0));
     }
 
     public Connection(ConnectionId remoteId) throws IOException {
@@ -245,7 +245,8 @@ public class HBaseClient {
        * otherwise, throw the timeout exception.
        */
       private void handleTimeout(SocketTimeoutException e) throws IOException {
-        if (shouldCloseConnection.get() || !running.get()) {
+        if (shouldCloseConnection.get() || !running.get() || 
+            remoteId.rpcTimeout > 0) {
           throw e;
         }
         sendPing();
@@ -308,6 +309,9 @@ public class HBaseClient {
             this.socket.setKeepAlive(tcpKeepAlive);
             // connection time out is 20s
             NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+            if (remoteId.rpcTimeout > 0) {
+              pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
+            }
             this.socket.setSoTimeout(pingInterval);
             break;
           } catch (SocketTimeoutException toe) {
@@ -718,14 +722,14 @@ public class HBaseClient {
    */
   public Writable call(Writable param, InetSocketAddress address)
   throws IOException {
-      return call(param, address, null);
+      return call(param, address, null, 0);
   }
 
   public Writable call(Writable param, InetSocketAddress addr,
-                       UserGroupInformation ticket)
+                       UserGroupInformation ticket, int rpcTimeout)
                        throws IOException {
     Call call = new Call(param);
-    Connection connection = getConnection(addr, ticket, call);
+    Connection connection = getConnection(addr, ticket, rpcTimeout, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;
     //noinspection SynchronizationOnLocalVariableOrMethodParameter
@@ -808,7 +812,7 @@ public class HBaseClient {
       for (int i = 0; i < params.length; i++) {
         ParallelCall call = new ParallelCall(params[i], results, i);
         try {
-          Connection connection = getConnection(addresses[i], null, call);
+          Connection connection = getConnection(addresses[i], null, 0, call);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
           // log errors
@@ -831,6 +835,7 @@ public class HBaseClient {
    * pool.  Connections to a given host/port are reused. */
   private Connection getConnection(InetSocketAddress addr,
                                    UserGroupInformation ticket,
+                                   int rpcTimeout,
                                    Call call)
                                    throws IOException {
     if (!running.get()) {
@@ -842,7 +847,7 @@ public class HBaseClient {
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
-    ConnectionId remoteId = new ConnectionId(addr, ticket);
+    ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout);
     do {
       synchronized (connections) {
         connection = connections.get(remoteId);
@@ -868,10 +873,13 @@ public class HBaseClient {
   private static class ConnectionId {
     final InetSocketAddress address;
     final UserGroupInformation ticket;
+    final private int rpcTimeout;
 
-    ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
+    ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
+        int rpcTimeout) {
       this.address = address;
       this.ticket = ticket;
+      this.rpcTimeout = rpcTimeout;
     }
 
     InetSocketAddress getAddress() {
@@ -885,7 +893,8 @@ public class HBaseClient {
     public boolean equals(Object obj) {
      if (obj instanceof ConnectionId) {
        ConnectionId id = (ConnectionId) obj;
-       return address.equals(id.address) && ticket == id.ticket;
+       return address.equals(id.address) && ticket == id.ticket && 
+       rpcTimeout == id.rpcTimeout;
        //Note : ticket is a ref comparision.
      }
      return false;
@@ -893,7 +902,7 @@ public class HBaseClient {
 
     @Override
     public int hashCode() {
-      return address.hashCode() ^ System.identityHashCode(ticket);
+      return address.hashCode() ^ System.identityHashCode(ticket) ^ rpcTimeout;
     }
   }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Mon Nov  1 18:01:57
2010
@@ -231,6 +231,7 @@ public class HBaseRPC {
     private UserGroupInformation ticket;
     private HBaseClient client;
     private boolean isClosed = false;
+    final private int rpcTimeout;
 
     /**
      * @param address address for invoker
@@ -239,10 +240,11 @@ public class HBaseRPC {
      * @param factory socket factory
      */
     public Invoker(InetSocketAddress address, UserGroupInformation ticket,
-                   Configuration conf, SocketFactory factory) {
+                   Configuration conf, SocketFactory factory, int rpcTimeout) {
       this.address = address;
       this.ticket = ticket;
       this.client = CLIENTS.getClient(conf, factory);
+      this.rpcTimeout = rpcTimeout;
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)
@@ -253,7 +255,7 @@ public class HBaseRPC {
         startTime = System.currentTimeMillis();
       }
       HbaseObjectWritable value = (HbaseObjectWritable)
-        client.call(new Invocation(method, args), address, ticket);
+        client.call(new Invocation(method, args), address, ticket, rpcTimeout);
       if (logDebug) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -324,6 +326,7 @@ public class HBaseRPC {
    * @param addr address of remote service
    * @param conf configuration
    * @param maxAttempts max attempts
+   * @param rpcTimeout timeout for each RPC
    * @param timeout timeout in milliseconds
    * @return proxy
    * @throws IOException e
@@ -334,6 +337,7 @@ public class HBaseRPC {
                                                InetSocketAddress addr,
                                                Configuration conf,
                                                int maxAttempts,
+                                               int rpcTimeout,
                                                long timeout
                                                ) throws IOException {
     // HBase does limited number of reconnects which is different from hadoop.
@@ -342,7 +346,7 @@ public class HBaseRPC {
     int reconnectAttempts = 0;
     while (true) {
       try {
-        return getProxy(protocol, clientVersion, addr, conf);
+        return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
       } catch(ConnectException se) {  // namenode has not been started
         ioe = se;
         if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
@@ -379,13 +383,15 @@ public class HBaseRPC {
    * @param addr remote address
    * @param conf configuration
    * @param factory socket factory
+   * @param rpcTimeout timeout for each RPC
    * @return proxy
    * @throws IOException e
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, Configuration conf,
-      SocketFactory factory) throws IOException {
-    return getProxy(protocol, clientVersion, addr, null, conf, factory);
+      SocketFactory factory, int rpcTimeout) throws IOException {
+    return getProxy(protocol, clientVersion, addr, null, conf, factory,
+        rpcTimeout);
   }
 
   /**
@@ -398,17 +404,18 @@ public class HBaseRPC {
    * @param ticket ticket
    * @param conf configuration
    * @param factory socket factory
+   * @param rpcTimeout timeout for each RPC
    * @return proxy
    * @throws IOException e
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
-      Configuration conf, SocketFactory factory)
+      Configuration conf, SocketFactory factory, int rpcTimeout)
   throws IOException {
     VersionedProtocol proxy =
         (VersionedProtocol) Proxy.newProxyInstance(
             protocol.getClassLoader(), new Class[] { protocol },
-            new Invoker(addr, ticket, conf, factory));
+            new Invoker(addr, ticket, conf, factory, rpcTimeout));
     long serverVersion = proxy.getProtocolVersion(protocol.getName(),
                                                   clientVersion);
     if (serverVersion == clientVersion) {
@@ -425,15 +432,17 @@ public class HBaseRPC {
    * @param clientVersion version we are expecting
    * @param addr remote address
    * @param conf configuration
+   * @param rpcTimeout timeout for each RPC
    * @return a proxy instance
    * @throws IOException e
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
-      long clientVersion, InetSocketAddress addr, Configuration conf)
+      long clientVersion, InetSocketAddress addr, Configuration conf,
+      int rpcTimeout)
       throws IOException {
 
     return getProxy(protocol, clientVersion, addr, conf, NetUtils
-        .getDefaultSocketFactory(conf));
+        .getDefaultSocketFactory(conf), rpcTimeout);
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon
Nov  1 18:01:57 2010
@@ -240,7 +240,7 @@ public class HRegionServer implements HR
   // A sleeper that sleeps for msgInterval.
   private final Sleeper sleeper;
 
-  private final long rpcTimeout;
+  private final int rpcTimeout;
 
   // The main region server thread.
   @SuppressWarnings("unused")
@@ -292,9 +292,9 @@ public class HRegionServer implements HR
     this.numRegionsToReport = conf.getInt(
         "hbase.regionserver.numregionstoreport", 10);
 
-    this.rpcTimeout = conf.getLong(
-        HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
-        HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
+    this.rpcTimeout = conf.getInt(
+        HConstants.HBASE_RPC_TIMEOUT_KEY,
+        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
 
     this.abortRequested = false;
     this.stopped = false;
@@ -1363,7 +1363,7 @@ public class HRegionServer implements HR
         master = (HMasterRegionInterface) HBaseRPC.waitForProxy(
             HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
             masterAddress.getInetSocketAddress(), this.conf, -1,
-            this.rpcTimeout);
+            this.rpcTimeout, this.rpcTimeout);
       } catch (IOException e) {
         LOG.warn("Unable to connect to master. Retrying. Error was:", e);
         sleeper.sleep();



Mime
View raw message