hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [1/2] hbase git commit: HBASE-15866 Split hbase.rpc.timeout into *.read.timeout and *.write.timeout
Date Sun, 07 Aug 2016 00:19:30 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 6b233c433 -> ec99838b9
  refs/heads/master 4e08a8bec -> 30d7eeaef


HBASE-15866 Split hbase.rpc.timeout into *.read.timeout and *.write.timeout

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Amending-Author: Andrew Purtell <apurtell@apache.org>

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec99838b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec99838b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec99838b

Branch: refs/heads/branch-1
Commit: ec99838b9c5ac931b16e7cc3c7c2b7bf5b024c0c
Parents: 6b233c4
Author: Vivek <vkoppuru@salesforce.com>
Authored: Fri Aug 5 17:25:06 2016 -0700
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Sat Aug 6 10:38:41 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  6 +-
 .../hbase/client/BufferedMutatorImpl.java       |  8 +-
 .../hadoop/hbase/client/ConnectionManager.java  |  7 +-
 .../org/apache/hadoop/hbase/client/HTable.java  | 66 +++++++++----
 .../hadoop/hbase/client/HTableMultiplexer.java  |  6 +-
 .../apache/hadoop/hbase/client/HTablePool.java  | 28 +++++-
 .../org/apache/hadoop/hbase/client/Table.java   | 46 ++++++++-
 .../hadoop/hbase/client/TestAsyncProcess.java   | 11 ++-
 .../org/apache/hadoop/hbase/HConstants.java     | 13 +++
 .../hadoop/hbase/rest/client/RemoteHTable.java  | 28 +++++-
 .../hadoop/hbase/client/HTableWrapper.java      | 22 ++++-
 .../hbase/client/HConnectionTestingUtility.java |  4 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java | 99 +++++++++++++++++---
 13 files changed, 291 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 209ec0e..780de18 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -280,7 +280,8 @@ class AsyncProcess {
   }
 
   public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
-      RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory)
{
+      RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory,
+      int rpcTimeout) {
     if (hc == null) {
       throw new IllegalArgumentException("HConnection cannot be null.");
     }
@@ -295,8 +296,7 @@ class AsyncProcess {
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    this.timeout = rpcTimeout;
     this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
 
     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 273f2e4..37a38be 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -71,6 +72,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
   private final int maxKeyValueSize;
   private boolean closed = false;
   private final ExecutorService pool;
+  private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
 
   @VisibleForTesting
   protected AsyncProcess ap; // non-final so can be overridden in test
@@ -93,8 +95,12 @@ public class BufferedMutatorImpl implements BufferedMutator {
     this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
         params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
 
+    this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+        conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+
     // puts need to track errors globally due to how the APIs currently work.
-    ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
+    ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 96ebebc..50ac87d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -560,6 +560,7 @@ class ConnectionManager {
     private final boolean useMetaReplicas;
     private final int numTries;
     final int rpcTimeout;
+    final int writeRpcTimeout;
     private NonceGenerator nonceGenerator = null;
     private final AsyncProcess asyncProcess;
     // single tracker per connection
@@ -654,6 +655,9 @@ class ConnectionManager {
       this.rpcTimeout = conf.getInt(
           HConstants.HBASE_RPC_TIMEOUT_KEY,
           HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+      this.writeRpcTimeout = conf.getInt(
+        HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
       if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
         synchronized (nonceGeneratorCreateLock) {
           if (ConnectionManager.nonceGenerator == null) {
@@ -2340,7 +2344,8 @@ class ConnectionManager {
     // For tests to override.
     protected AsyncProcess createAsyncProcess(Configuration conf) {
       // No default pool available.
-      return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory);
+      return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory,
+        writeRpcTimeout);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 4cd11d0..a793f3e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -125,7 +125,8 @@ public class HTable implements HTableInterface, RegionLocator {
   protected long scannerMaxResultSize;
   private ExecutorService pool;  // For Multi & Scan
   private int operationTimeout; // global timeout for each blocking method with retrying
rpc
-  private int rpcTimeout; // timeout for each rpc request
+  private int readRpcTimeout; // timeout for each read rpc request
+  private int writeRpcTimeout; // timeout for each write rpc request
   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
   private final boolean cleanupConnectionOnClose; // close the connection in close()
   private Consistency defaultConsistency = Consistency.STRONG;
@@ -356,8 +357,12 @@ public class HTable implements HTableInterface, RegionLocator {
     }
     this.operationTimeout = tableName.isSystemTable() ?
         connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
-    this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+        configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+    this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+        configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     this.scannerCaching = connConfiguration.getScannerCaching();
     this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
     if (this.rpcCallerFactory == null) {
@@ -573,7 +578,7 @@ public class HTable implements HTableInterface, RegionLocator {
   @Override
   public HTableDescriptor getTableDescriptor() throws IOException {
     HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
-      rpcControllerFactory, operationTimeout, rpcTimeout);
+      rpcControllerFactory, operationTimeout, readRpcTimeout);
     if (htd != null) {
       return new UnmodifyableHTableDescriptor(htd);
     }
@@ -755,7 +760,7 @@ public class HTable implements HTableInterface, RegionLocator {
          }
        }
      };
-     return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
+     return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
          this.operationTimeout);
    }
 
@@ -861,7 +866,7 @@ public class HTable implements HTableInterface, RegionLocator {
           }
         }
       };
-      return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
+      return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
           this.operationTimeout);
     }
 
@@ -978,7 +983,7 @@ public class HTable implements HTableInterface, RegionLocator {
         }
       }
     };
-    rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
+    rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
 
@@ -1108,7 +1113,7 @@ public class HTable implements HTableInterface, RegionLocator {
           }
         }
       };
-    return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
+    return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
 
@@ -1140,7 +1145,7 @@ public class HTable implements HTableInterface, RegionLocator {
         }
       }
     };
-    return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
+    return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
 
@@ -1211,7 +1216,7 @@ public class HTable implements HTableInterface, RegionLocator {
           }
         }
       };
-    return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
+    return rpcCallerFactory.<Long> newCaller(writeRpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
 
@@ -1241,7 +1246,7 @@ public class HTable implements HTableInterface, RegionLocator {
           }
         }
       };
-    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
+    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
 
@@ -1272,7 +1277,7 @@ public class HTable implements HTableInterface, RegionLocator {
           }
         }
       };
-    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
+    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
 
@@ -1302,7 +1307,7 @@ public class HTable implements HTableInterface, RegionLocator {
           }
         }
       };
-    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
+    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
 
@@ -1333,7 +1338,7 @@ public class HTable implements HTableInterface, RegionLocator {
           }
         }
       };
-    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
+    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
 
@@ -1806,12 +1811,35 @@ public class HTable implements HTableInterface, RegionLocator {
     return operationTimeout;
   }
 
-  @Override public void setRpcTimeout(int rpcTimeout) {
-    this.rpcTimeout = rpcTimeout;
+  @Override
+  @Deprecated
+  public int getRpcTimeout() {
+    return readRpcTimeout;
+  }
+
+  @Override
+  @Deprecated
+  public void setRpcTimeout(int rpcTimeout) {
+    this.readRpcTimeout = rpcTimeout;
+    this.writeRpcTimeout = rpcTimeout;
   }
 
-  @Override public int getRpcTimeout() {
-    return rpcTimeout;
+  @Override
+  public int getWriteRpcTimeout() {
+    return writeRpcTimeout;
+  }
+
+  @Override
+  public void setWriteRpcTimeout(int writeRpcTimeout) {
+    this.writeRpcTimeout = writeRpcTimeout;
+  }
+
+  @Override
+  public int getReadRpcTimeout() { return readRpcTimeout; }
+
+  @Override
+  public void setReadRpcTimeout(int readRpcTimeout) {
+    this.readRpcTimeout = readRpcTimeout;
   }
 
   @Override
@@ -1891,7 +1919,7 @@ public class HTable implements HTableInterface, RegionLocator {
     AsyncProcess asyncProcess =
         new AsyncProcess(connection, configuration, pool,
             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
-            true, RpcControllerFactory.instantiate(configuration));
+            true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
 
     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
         new Callback<ClientProtos.CoprocessorServiceResult>() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index dfb0104..7b2b136 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -441,6 +441,7 @@ public class HTableMultiplexer {
     private final ScheduledExecutorService executor;
     private final int maxRetryInQueue;
     private final AtomicInteger retryInQueue = new AtomicInteger(0);
+    private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
 
     public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
         HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
@@ -450,7 +451,10 @@ public class HTableMultiplexer {
       this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
       RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
       RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-      this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
+      this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+              HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+      this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory,
writeRpcTimeout);
       this.executor = executor;
       this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
index 2d18367..502703b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
@@ -678,12 +678,36 @@ public class HTablePool implements Closeable {
       return table.getOperationTimeout();
     }
 
-    @Override public void setRpcTimeout(int rpcTimeout) {
+    @Override
+    @Deprecated
+    public void setRpcTimeout(int rpcTimeout) {
       table.setRpcTimeout(rpcTimeout);
     }
 
-    @Override public int getRpcTimeout() {
+    @Override
+    @Deprecated
+    public int getRpcTimeout() {
       return table.getRpcTimeout();
     }
+
+    @Override
+    public int getReadRpcTimeout() {
+      return table.getReadRpcTimeout();
+    }
+
+    @Override
+    public void setReadRpcTimeout(int readRpcTimeout) {
+      table.setReadRpcTimeout(readRpcTimeout);
+    }
+
+    @Override
+    public int getWriteRpcTimeout() {
+      return table.getWriteRpcTimeout();
+    }
+
+    @Override
+    public void setWriteRpcTimeout(int writeRpcTimeout) {
+      table.setWriteRpcTimeout(writeRpcTimeout);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 8c6169d..1a6572b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -614,16 +614,56 @@ public interface Table extends Closeable {
   public int getOperationTimeout();
 
   /**
+   * Get timeout (millisecond) of each rpc request in this Table instance.
+   *
+   * @returns Currently configured read timeout
+   * @deprecated Use getReadRpcTimeout or getWriteRpcTimeout instead
+   */
+  @Deprecated
+  int getRpcTimeout();
+
+  /**
    * Set timeout (millisecond) of each rpc request in operations of this Table instance,
will
    * override the value of hbase.rpc.timeout in configuration.
    * If a rpc request waiting too long, it will stop waiting and send a new request to retry
until
    * retries exhausted or operation timeout reached.
+   * <p>
+   * NOTE: This will set both the read and write timeout settings to the provided value.
+   *
    * @param rpcTimeout the timeout of each rpc request in millisecond.
+   *
+   * @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead
    */
-  public void setRpcTimeout(int rpcTimeout);
+  @Deprecated
+  void setRpcTimeout(int rpcTimeout);
 
   /**
-   * Get timeout (millisecond) of each rpc request in this Table instance.
+   * Get timeout (millisecond) of each rpc read request in this Table instance.
+   */
+  int getReadRpcTimeout();
+
+  /**
+   * Set timeout (millisecond) of each rpc read request in operations of this Table instance,
will
+   * override the value of hbase.rpc.read.timeout in configuration.
+   * If a rpc read request waiting too long, it will stop waiting and send a new request
to retry
+   * until retries exhausted or operation timeout reached.
+   *
+   * @param readRpcTimeout
+   */
+  void setReadRpcTimeout(int readRpcTimeout);
+
+  /**
+   * Get timeout (millisecond) of each rpc write request in this Table instance.
+   */
+  int getWriteRpcTimeout();
+
+  /**
+   * Set timeout (millisecond) of each rpc write request in operations of this Table instance,
will
+   * override the value of hbase.rpc.write.timeout in configuration.
+   * If a rpc write request waiting too long, it will stop waiting and send a new request
to retry
+   * until retries exhausted or operation timeout reached.
+   *
+   * @param writeRpcTimeout
    */
-  public int getRpcTimeout();
+  void setWriteRpcTimeout(int writeRpcTimeout);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 3a0c08d..8fb2e8b 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -135,6 +135,7 @@ public class TestAsyncProcess {
     final AtomicInteger nbActions = new AtomicInteger();
     public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
     public AtomicInteger callsCt = new AtomicInteger();
+    private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
 
     @Override
     protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName
tableName,
@@ -155,14 +156,14 @@ public class TestAsyncProcess {
     public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads)
{
       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
           new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
-            new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+            new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout);
     }
 
     public MyAsyncProcess(
         ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
-          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
+          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
rpcTimeout);
     }
 
     public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
@@ -174,7 +175,7 @@ public class TestAsyncProcess {
           throw new RejectedExecutionException("test under failure");
         }
       },
-          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
+          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
rpcTimeout);
     }
 
     @Override
@@ -1115,10 +1116,12 @@ public class TestAsyncProcess {
   }
 
   static class AsyncProcessForThrowableCheck extends AsyncProcess {
+    private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+
     public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
         ExecutorService pool) {
       super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
-          conf));
+          conf), rpcTimeout);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 8cb937a..654c012 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -799,10 +799,23 @@ public final class HConstants {
 
   /**
    * timeout for each RPC
+   * @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY}
+   * instead.
    */
+  @Deprecated
   public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
 
   /**
+   * timeout for each read RPC
+   */
+  public static final String HBASE_RPC_READ_TIMEOUT_KEY = "hbase.rpc.read.timeout";
+
+  /**
+   * timeout for each write RPC
+   */
+  public static final String HBASE_RPC_WRITE_TIMEOUT_KEY = "hbase.rpc.write.timeout";
+
+  /**
    * Default value of {@link #HBASE_RPC_TIMEOUT_KEY}
    */
   public static final int DEFAULT_HBASE_RPC_TIMEOUT = 60000;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index 8fa1b8a..63b18b1 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -866,11 +866,35 @@ public class RemoteHTable implements Table {
     throw new UnsupportedOperationException();
   }
 
-  @Override public void setRpcTimeout(int rpcTimeout) {
+  @Override
+  @Deprecated
+  public void setRpcTimeout(int rpcTimeout) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  @Deprecated
+  public int getRpcTimeout() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getReadRpcTimeout() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setReadRpcTimeout(int readRpcTimeout) {
     throw new UnsupportedOperationException();
   }
 
-  @Override public int getRpcTimeout() {
+  @Override
+  public int getWriteRpcTimeout() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setWriteRpcTimeout(int writeRpcTimeout) {
     throw new UnsupportedOperationException();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
index 2d25f63..dc27240 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
@@ -374,11 +374,25 @@ public class HTableWrapper implements HTableInterface {
     return table.getOperationTimeout();
   }
 
-  @Override public void setRpcTimeout(int rpcTimeout) {
+  @Override
+  @Deprecated
+  public int getRpcTimeout() { return table.getRpcTimeout(); }
+
+  @Override
+  @Deprecated
+  public void setRpcTimeout(int rpcTimeout) {
     table.setRpcTimeout(rpcTimeout);
   }
 
-  @Override public int getRpcTimeout() {
-    return table.getRpcTimeout();
-  }
+  @Override
+  public void setWriteRpcTimeout(int writeRpcTimeout) { table.setWriteRpcTimeout(writeRpcTimeout);
}
+
+  @Override
+  public void setReadRpcTimeout(int readRpcTimeout) { table.setReadRpcTimeout(readRpcTimeout);
}
+
+  @Override
+  public int getWriteRpcTimeout() { return table.getWriteRpcTimeout(); }
+
+  @Override
+  public int getReadRpcTimeout() { return table.getReadRpcTimeout(); }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 1a7c2ef..8cadbc8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
@@ -154,7 +155,8 @@ public class HConnectionTestingUtility {
     Mockito.when(c.getNonceGenerator()).thenReturn(ng);
     Mockito.when(c.getAsyncProcess()).thenReturn(
       new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
-          RpcControllerFactory.instantiate(conf)));
+        RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+          HConstants.DEFAULT_HBASE_RPC_TIMEOUT)));
     Mockito.doNothing().when(c).incCount();
     Mockito.doNothing().when(c).decCount();
     Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 0d9efa2..443cd12 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -18,12 +18,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -155,6 +150,16 @@ public class TestHCM {
     }
   }
 
+  public static class SleepWriteCoprocessor extends BaseRegionObserver {
+    public static final int SLEEP_TIME = 5000;
+    @Override
+    public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment>
e,
+                               final Increment increment) throws IOException {
+      Threads.sleep(SLEEP_TIME);
+      return super.preIncrement(e, increment);
+    }
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
@@ -348,18 +353,88 @@ public class TestHCM {
     }
   }
 
-  @Test(expected = RetriesExhaustedException.class)
+  @Test
   public void testRpcTimeout() throws Exception {
     HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
     hdt.addCoprocessor(SleepCoprocessor.class.getName());
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
 
     try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
-      assert t instanceof HTable;
-      HTable table = (HTable) t;
-      table.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
-      table.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
-      table.get(new Get(FAM_NAM));
+      t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+      t.get(new Get(FAM_NAM));
+      fail("Get should not have succeeded");
+    } catch (RetriesExhaustedException e) {
+      // expected
+    }
+
+    // Again, with configuration based override
+    c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
+    try (Connection conn = ConnectionFactory.createConnection(c)) {
+      try (Table t = conn.getTable(hdt.getTableName())) {
+        t.get(new Get(FAM_NAM));
+        fail("Get should not have succeeded");
+      } catch (RetriesExhaustedException e) {
+        // expected
+      }
+    }
+  }
+
+  @Test
+  public void testWriteRpcTimeout() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout");
+    hdt.addCoprocessor(SleepWriteCoprocessor.class.getName());
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+    try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+      t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100);
+      Increment i = new Increment(FAM_NAM);
+      i.addColumn(FAM_NAM, FAM_NAM, 1);
+      t.increment(i);
+      fail("Write should not have succeeded");
+    } catch (RetriesExhaustedException e) {
+      // expected
+    }
+
+    // Again, with configuration based override
+    c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2);
+    try (Connection conn = ConnectionFactory.createConnection(c)) {
+      try (Table t = conn.getTable(hdt.getTableName())) {
+        Increment i = new Increment(FAM_NAM);
+        i.addColumn(FAM_NAM, FAM_NAM, 1);
+        t.increment(i);
+        fail("Write should not have succeeded");
+      } catch (RetriesExhaustedException e) {
+        // expected
+      }
+    }
+  }
+
+  @Test
+  public void testReadRpcTimeout() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout");
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+    try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+      t.setReadRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+      t.get(new Get(FAM_NAM));
+      fail("Get should not have succeeded");
+    } catch (RetriesExhaustedException e) {
+      // expected
+    }
+
+    // Again, with configuration based override
+    c.setInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
+    try (Connection conn = ConnectionFactory.createConnection(c)) {
+      try (Table t = conn.getTable(hdt.getTableName())) {
+        t.get(new Get(FAM_NAM));
+        fail("Get should not have succeeded");
+      } catch (RetriesExhaustedException e) {
+        // expected
+      }
     }
   }
 


Mime
View raw message