hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1127680 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/util/
Date Wed, 25 May 2011 20:50:32 GMT
Author: stack
Date: Wed May 25 20:50:32 2011
New Revision: 1127680

URL: http://svn.apache.org/viewvc?rev=1127680&view=rev
Log:
HBASE-2937 Facilitate Timeouts In HBase Client

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed May 25 20:50:32 2011
@@ -231,6 +231,7 @@ Release 0.91.0 - Unreleased
    HBASE-3811  Allow adding attributes to Scan (Alex Baranau)
    HBASE-3841  HTable and HTableInterface docs are inconsistent with
                one another (Harsh J Chouraria)
+   HBASE-2937  Facilitate Timeouts In HBase Client (Karthick Sankarachary)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed May 25 20:50:32
2011
@@ -155,6 +155,12 @@ public final class HConstants {
   /** Parameter name for HBase client IPC pool size */
   public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size";
 
+  /** Parameter name for HBase client operation timeout, which overrides RPC timeout */
+  public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout";
+
+  /** Default HBase client operation timeout, which is tantamount to a blocking call */
+  public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
+
   /** Used to construct the name of the log directory for a region server
    * Use '.' as a special character to seperate the log files from table data */
   public static final String HREGION_LOGDIR_NAME = ".logs";

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed May
25 20:50:32 2011
@@ -1269,14 +1269,18 @@ public class HConnectionManager {
       for(int tries = 0; tries < numRetries; tries++) {
         try {
           callable.instantiateServer(tries != 0);
+          callable.beforeCall();
           return callable.call();
         } catch (Throwable t) {
+          callable.shouldRetry(t);
           t = translateException(t);
           exceptions.add(t);
           if (tries == numRetries - 1) {
             throw new RetriesExhaustedException(callable.getServerName(),
                 callable.getRegionName(), callable.getRow(), tries, exceptions);
           }
+        } finally {
+          callable.afterCall();
         }
         try {
           Thread.sleep(getPauseTime(tries));
@@ -1292,6 +1296,7 @@ public class HConnectionManager {
         throws IOException, RuntimeException {
       try {
         callable.instantiateServer(false);
+        callable.beforeCall();
         return callable.call();
       } catch (Throwable t) {
         Throwable t2 = translateException(t);
@@ -1300,6 +1305,8 @@ public class HConnectionManager {
         } else {
           throw new RuntimeException(t2);
         }
+      } finally {
+        callable.afterCall();
       }
     }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed May 25 20:50:32
2011
@@ -111,6 +111,7 @@ public class HTable implements HTableInt
   private ExecutorService pool;  // For Multi
   private long maxScannerResultSize;
   private boolean closed;
+  private int operationTimeout;
 
   /**
    * Creates an object to access a HBase table.
@@ -180,6 +181,9 @@ public class HTable implements HTableInt
     this.connection = HConnectionManager.getConnection(conf);
     this.scannerTimeout =
       (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
+    this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
+        : conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+            HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     this.configuration = conf;
     this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
     this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
@@ -549,7 +553,7 @@ public class HTable implements HTableInt
    public Result getRowOrBefore(final byte[] row, final byte[] family)
    throws IOException {
      return connection.getRegionServerWithRetries(
-         new ServerCallable<Result>(connection, tableName, row) {
+         new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
        public Result call() throws IOException {
          return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
            row, family);
@@ -594,7 +598,7 @@ public class HTable implements HTableInt
   @Override
   public Result get(final Get get) throws IOException {
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Result>(connection, tableName, get.getRow()) {
+        new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout)
{
           public Result call() throws IOException {
             return server.get(location.getRegionInfo().getRegionName(), get);
           }
@@ -650,7 +654,7 @@ public class HTable implements HTableInt
   public void delete(final Delete delete)
   throws IOException {
     connection.getRegionServerWithRetries(
-        new ServerCallable<Boolean>(connection, tableName, delete.getRow()) {
+        new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout)
{
           public Boolean call() throws IOException {
             server.delete(location.getRegionInfo().getRegionName(), delete);
             return null; // FindBugs NP_BOOLEAN_RETURN_NULL
@@ -720,7 +724,7 @@ public class HTable implements HTableInt
           "Invalid arguments to increment, no columns specified");
     }
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Result>(connection, tableName, increment.getRow()) {
+        new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout)
{
           public Result call() throws IOException {
             return server.increment(
                 location.getRegionInfo().getRegionName(), increment);
@@ -757,7 +761,7 @@ public class HTable implements HTableInt
           "Invalid arguments to incrementColumnValue", npe);
     }
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Long>(connection, tableName, row) {
+        new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
           public Long call() throws IOException {
             return server.incrementColumnValue(
                 location.getRegionInfo().getRegionName(), row, family,
@@ -776,7 +780,7 @@ public class HTable implements HTableInt
       final Put put)
   throws IOException {
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Boolean>(connection, tableName, row) {
+        new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
           public Boolean call() throws IOException {
             return server.checkAndPut(location.getRegionInfo().getRegionName(),
                 row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
@@ -795,7 +799,7 @@ public class HTable implements HTableInt
       final Delete delete)
   throws IOException {
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Boolean>(connection, tableName, row) {
+        new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
           public Boolean call() throws IOException {
             return server.checkAndDelete(
                 location.getRegionInfo().getRegionName(),
@@ -812,7 +816,7 @@ public class HTable implements HTableInt
   @Override
   public boolean exists(final Get get) throws IOException {
     return connection.getRegionServerWithRetries(
-        new ServerCallable<Boolean>(connection, tableName, get.getRow()) {
+        new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout)
{
           public Boolean call() throws IOException {
             return server.
                 exists(location.getRegionInfo().getRegionName(), get);
@@ -876,7 +880,7 @@ public class HTable implements HTableInt
   public RowLock lockRow(final byte [] row)
   throws IOException {
     return connection.getRegionServerWithRetries(
-      new ServerCallable<RowLock>(connection, tableName, row) {
+      new ServerCallable<RowLock>(connection, tableName, row, operationTimeout) {
         public RowLock call() throws IOException {
           long lockId =
               server.lockRow(location.getRegionInfo().getRegionName(), row);
@@ -893,7 +897,7 @@ public class HTable implements HTableInt
   public void unlockRow(final RowLock rl)
   throws IOException {
     connection.getRegionServerWithRetries(
-      new ServerCallable<Boolean>(connection, tableName, rl.getRow()) {
+      new ServerCallable<Boolean>(connection, tableName, rl.getRow(), operationTimeout)
{
         public Boolean call() throws IOException {
           server.unlockRow(location.getRegionInfo().getRegionName(),
               rl.getLockId());
@@ -1477,4 +1481,12 @@ public class HTable implements HTableInt
     return rangeKeys;
   }
 
+  public void setOperationTimeout(int operationTimeout) {
+    this.operationTimeout = operationTimeout;
+  }
+
+  public int getOperationTimeout() {
+    return operationTimeout;
+  }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Wed May 25
20:50:32 2011
@@ -20,10 +20,17 @@
 
 package org.apache.hadoop.hbase.client;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.retry.RetryPolicy;
 
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 import java.util.concurrent.Callable;
 
 /**
@@ -36,6 +43,8 @@ public abstract class ServerCallable<T> 
   protected final byte [] row;
   protected HRegionLocation location;
   protected HRegionInterface server;
+  protected int callTimeout;
+  protected long startTime, endTime;
 
   /**
    * @param connection connection callable is on
@@ -43,11 +52,15 @@ public abstract class ServerCallable<T> 
    * @param row row we are querying
    */
   public ServerCallable(HConnection connection, byte [] tableName, byte [] row) {
+    this(connection, tableName, row, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+  }
+
+  public ServerCallable(HConnection connection, byte [] tableName, byte [] row, int callTimeout)
{
     this.connection = connection;
     this.tableName = tableName;
     this.row = row;
+    this.callTimeout = callTimeout;
   }
-
   /**
    *
    * @param reload set this to true if connection should re-find the region
@@ -78,4 +91,28 @@ public abstract class ServerCallable<T> 
   public byte [] getRow() {
     return row;
   }
+
+  public void beforeCall() {
+    HBaseRPC.setRpcTimeout(this.callTimeout);
+    this.startTime = System.currentTimeMillis();
+  }
+
+  public void afterCall() {
+    HBaseRPC.resetRpcTimeout();
+    this.endTime = System.currentTimeMillis();
+  }
+
+  public void shouldRetry(Throwable throwable) throws IOException {
+    if (this.callTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
+      if (throwable instanceof SocketTimeoutException
+          || (this.endTime - this.startTime > this.callTimeout)) {
+        throw (SocketTimeoutException) (SocketTimeoutException) new SocketTimeoutException(
+            "Call to access row '" + Bytes.toString(row) + "' on table '"
+                + Bytes.toString(tableName)
+                + "' failed on socket timeout exception: " + throwable)
+            .initCause(throwable);
+      } else {
+        this.callTimeout = ((int) (this.endTime - this.startTime));
+      }
+  }
 }
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed May 25 20:50:32
2011
@@ -36,6 +36,7 @@ import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -72,7 +73,7 @@ public class HBaseClient {
 
   private static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
-  protected final Map<ConnectionId, Connection> connections;
+  protected final PoolMap<ConnectionId, Connection> connections;
 
   protected final Class<? extends Writable> valueClass;   // class of call values
   protected int counter;                            // counter for call ids
@@ -560,9 +561,25 @@ public class HBaseClient {
         } else {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
-          call.setValue(value);
+          // it's possible that this call may have been cleaned up due to a RPC
+          // timeout, so check if it still exists before setting the value.
+          if (call != null) {
+            call.setValue(value);
+          }
           calls.remove(id);
         }
+      } catch (SocketTimeoutException ste) {
+        if (remoteId.rpcTimeout > 0) {
+          // Clean up open calls but don't treat this as a fatal condition,
+          // since we expect certain responses to not make it by the specified
+          // {@link ConnectionId#rpcTimeout}.
+          closeException = ste;
+          cleanupCalls();
+        } else {
+          // Since the server did not respond within the default ping interval
+          // time, treat this as a fatal condition and close this connection
+          markClosed(ste);
+        }
       } catch (IOException e) {
         markClosed(e);
       }
@@ -585,9 +602,7 @@ public class HBaseClient {
       // release the resources
       // first thing to do;take the connection out of the connection list
       synchronized (connections) {
-        if (connections.get(remoteId) == this) {
-          connections.remove(remoteId);
-        }
+        connections.remove(remoteId, this);
       }
 
       // close the streams and therefore the socket
@@ -624,6 +639,10 @@ public class HBaseClient {
       while (itor.hasNext()) {
         Call c = itor.next().getValue();
         c.setException(closeException); // local exception
+        // Notify the open calls, so they are aware of what just happened
+        synchronized (c) {
+          c.notifyAll();
+        }
         itor.remove();
       }
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Wed May 25 20:50:32
2011
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.VersionedProtocol;
@@ -87,6 +88,14 @@ public class HBaseRPC {
   private static final Map<Class,RpcEngine> PROXY_ENGINES
     = new HashMap<Class,RpcEngine>();
 
+  // thread-specific RPC timeout, which may override that of RpcEngine
+  private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>()
{
+    @Override
+      protected Integer initialValue() {
+        return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
+      }
+    };
+
   // set a protocol to use a non-default RpcEngine
   static void setProtocolEngine(Configuration conf,
                                 Class protocol, Class engine) {
@@ -285,7 +294,7 @@ public class HBaseRPC {
   throws IOException {
     VersionedProtocol proxy =
         getProtocolEngine(protocol,conf)
-            .getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout);
+            .getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout,
HBaseRPC.getRpcTimeout()));
     long serverVersion = proxy.getProtocolVersion(protocol.getName(),
                                                   clientVersion);
     if (serverVersion == clientVersion) {
@@ -379,4 +388,16 @@ public class HBaseRPC {
     return getProtocolEngine(protocol, conf)
         .getServer(protocol, instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount,
verbose, conf, highPriorityLevel);
   }
+
+  public static void setRpcTimeout(int rpcTimeout) {
+    HBaseRPC.rpcTimeout.set(rpcTimeout);
+  }
+
+  public static int getRpcTimeout() {
+    return HBaseRPC.rpcTimeout.get();
+  }
+
+  public static void resetRpcTimeout() {
+    HBaseRPC.rpcTimeout.remove();
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java Wed May 25 20:50:32
2011
@@ -77,11 +77,12 @@ public class PoolMap<K, V> implements Ma
     return pool != null ? pool.put(value) : null;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public V remove(Object key) {
     Pool<V> pool = pools.remove(key);
     if (pool != null) {
-      pool.clear();
+      remove((K) key, pool.get());
     }
     return null;
   }



Mime
View raw message