kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] 03/03: KUDU-1868: Part 1: Add timer-based RPC timeouts
Date Tue, 26 Feb 2019 22:05:33 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 877b9f64d8b93353647e7dab2e66b4783322f7ad
Author: Will Berkeley <wdberkeley@gmail.org>
AuthorDate: Fri Jan 25 14:09:54 2019 -0800

    KUDU-1868: Part 1: Add timer-based RPC timeouts
    
    Currently, the Java client requires some kind of event to detect the
    timeout of an RPC: either a response from the server in the chain of
    sub-RPCs or a socket read timeout on the connection. This patch adds a
    timer task to actively time out an RPC once it passes its deadline.
    
    Part 2 will eliminate socket read timeouts from the Java client, except
    possibly in the case of negotiation, which will fully resolve KUDU-1868.
    
    There is one test included, which checks that timeouts occur without an
    "outside stimulus" like a response from the server.
    
    This patch should not degrade the performance of the client. Even though
    every timer task holds a reference to its RPC, when the RPC completes it
    cancels the timer task, which will make the timer release it at the next
    tick. This means the RPC and its task should be available to be GC'd
    after the next tick of the timer.
    
    Change-Id: I8d823b63ac0a41cc5e42b63a7c19e0ef777e1dea
    Reviewed-on: http://gerrit.cloudera.org:8080/12338
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Grant Henke <granthenke@apache.org>
---
 .../org/apache/kudu/client/AlterTableRequest.java  |  12 +-
 .../org/apache/kudu/client/AlterTableResponse.java |   6 +-
 .../org/apache/kudu/client/AsyncKuduClient.java    | 169 ++++++++++++++-------
 .../org/apache/kudu/client/AsyncKuduScanner.java   |   9 +-
 .../org/apache/kudu/client/AsyncKuduSession.java   |  21 ++-
 .../main/java/org/apache/kudu/client/Batch.java    |  28 +++-
 .../java/org/apache/kudu/client/BatchResponse.java |  14 +-
 .../org/apache/kudu/client/ConnectToCluster.java   |  20 +--
 .../apache/kudu/client/ConnectToMasterRequest.java |   7 +-
 .../org/apache/kudu/client/CreateTableRequest.java |  11 +-
 .../apache/kudu/client/CreateTableResponse.java    |   6 +-
 .../org/apache/kudu/client/DeleteTableRequest.java |   8 +-
 .../apache/kudu/client/DeleteTableResponse.java    |   6 +-
 .../kudu/client/GetTableLocationsRequest.java      |  13 +-
 .../apache/kudu/client/GetTableSchemaRequest.java  |   9 +-
 .../apache/kudu/client/GetTableSchemaResponse.java |   6 +-
 .../kudu/client/IsAlterTableDoneRequest.java       |  11 +-
 .../kudu/client/IsCreateTableDoneRequest.java      |  12 +-
 .../main/java/org/apache/kudu/client/KuduRpc.java  |  41 +++--
 .../org/apache/kudu/client/ListTablesRequest.java  |  11 +-
 .../org/apache/kudu/client/ListTablesResponse.java |   4 +-
 .../kudu/client/ListTabletServersRequest.java      |  14 +-
 .../kudu/client/ListTabletServersResponse.java     |   6 +-
 .../org/apache/kudu/client/ListTabletsRequest.java |   8 +-
 .../apache/kudu/client/ListTabletsResponse.java    |   4 +-
 .../java/org/apache/kudu/client/Operation.java     |  27 +++-
 .../org/apache/kudu/client/OperationResponse.java  |  14 +-
 .../java/org/apache/kudu/client/PingRequest.java   |  12 +-
 .../org/apache/kudu/client/RowResultIterator.java  |  19 ++-
 .../main/java/org/apache/kudu/client/RpcProxy.java |   6 +-
 .../apache/kudu/client/TestAsyncKuduSession.java   |   2 +-
 .../apache/kudu/client/TestConnectionCache.java    |   2 +-
 .../java/org/apache/kudu/client/TestTimeouts.java  |  66 +++++++-
 33 files changed, 421 insertions(+), 183 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
index b6d8cf7..9e2dfd0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.util.Pair;
 
@@ -43,8 +44,12 @@ class AlterTableRequest extends KuduRpc<AlterTableResponse> {
   private final AlterTableRequestPB.Builder builder;
   private final List<Integer> requiredFeatures;
 
-  AlterTableRequest(KuduTable masterTable, String name, AlterTableOptions ato) {
-    super(masterTable);
+  AlterTableRequest(KuduTable masterTable,
+                    String name,
+                    AlterTableOptions ato,
+                    Timer timer,
+                    long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     this.name = name;
     this.builder = ato.getProtobuf();
     this.requiredFeatures = ato.hasAddDropRangePartitions() ?
@@ -75,7 +80,8 @@ class AlterTableRequest extends KuduRpc<AlterTableResponse> {
     final AlterTableResponsePB.Builder respBuilder = AlterTableResponsePB.newBuilder();
     readProtobuf(callResponse.getPBMessage(), respBuilder);
     AlterTableResponse response = new AlterTableResponse(
-        deadlineTracker.getElapsedMillis(), tsUUID,
+        deadlineTracker.getElapsedMillis(),
+        tsUUID,
         respBuilder.hasTableId() ? respBuilder.getTableId().toStringUtf8() : null);
 
     return new Pair<AlterTableResponse, Object>(
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableResponse.java
index 62a3b74..bc4e9cb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableResponse.java
@@ -27,10 +27,10 @@ public class AlterTableResponse extends KuduRpcResponse {
   private String tableId;
 
   /**
-   * @param ellapsedMillis Time in milliseconds since RPC creation to now.
+   * @param elapsedMillis Time in milliseconds since RPC creation to now.
    */
-  AlterTableResponse(long ellapsedMillis, String tsUUID, String tableId) {
-    super(ellapsedMillis, tsUUID);
+  AlterTableResponse(long elapsedMillis, String tsUUID, String tableId) {
+    super(elapsedMillis, tsUUID);
     this.tableId = tableId;
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 494cb4d..ae2482f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -63,6 +63,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioWorkerPool;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.Timer;
 import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -523,6 +524,16 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * Returns the {@link Timer} instance held by this client. This timer should
+   * be used everywhere for scheduling tasks after a delay, e.g., for
+   * timeouts.
+   * @return the time instance held by this client
+   */
+  Timer getTimer() {
+    return timer;
+  }
+
+  /**
    * Returns a synchronous {@link KuduClient} which wraps this asynchronous client.
    * Calling {@link KuduClient#close} on the returned client will close this client.
    * If this asynchronous client should outlive the returned synchronous client,
@@ -558,9 +569,12 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     // Send the CreateTable RPC.
-    final CreateTableRequest create = new CreateTableRequest(
-        this.masterTable, name, schema, builder);
-    create.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    final CreateTableRequest create = new CreateTableRequest(this.masterTable,
+                                                             name,
+                                                             schema,
+                                                             builder,
+                                                             timer,
+                                                             defaultAdminOperationTimeoutMs);
     Deferred<CreateTableResponse> createTableD = sendRpcToTablet(create);
 
     // Add a callback that converts the response into a KuduTable.
@@ -607,9 +621,10 @@ public class AsyncKuduClient implements AutoCloseable {
       @Nonnull TableIdentifierPB.Builder table,
       @Nullable KuduRpc<?> parent) {
     checkIsClosed();
-    IsCreateTableDoneRequest request = new IsCreateTableDoneRequest(
-        this.masterTable, table);
-    request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    IsCreateTableDoneRequest request = new IsCreateTableDoneRequest(this.masterTable,
+                                                                    table,
+                                                                    timer,
+                                                                    defaultAdminOperationTimeoutMs);
     if (parent != null) {
       request.setParentRpc(parent);
     }
@@ -623,8 +638,10 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   public Deferred<DeleteTableResponse> deleteTable(String name) {
     checkIsClosed();
-    DeleteTableRequest delete = new DeleteTableRequest(this.masterTable, name);
-    delete.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    DeleteTableRequest delete = new DeleteTableRequest(this.masterTable,
+                                                       name,
+                                                       timer,
+                                                       defaultAdminOperationTimeoutMs);
     return sendRpcToTablet(delete);
   }
 
@@ -637,8 +654,11 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   public Deferred<AlterTableResponse> alterTable(String name, AlterTableOptions ato) {
     checkIsClosed();
-    final AlterTableRequest alter = new AlterTableRequest(this.masterTable, name, ato);
-    alter.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    final AlterTableRequest alter = new AlterTableRequest(this.masterTable,
+                                                          name,
+                                                          ato,
+                                                          timer,
+                                                          defaultAdminOperationTimeoutMs);
     Deferred<AlterTableResponse> responseD = sendRpcToTablet(alter);
 
     if (ato.hasAddDropRangePartitions()) {
@@ -705,12 +725,11 @@ public class AsyncKuduClient implements AutoCloseable {
       @Nonnull TableIdentifierPB.Builder table,
       @Nullable KuduRpc<?> parent) {
     checkIsClosed();
-    IsAlterTableDoneRequest request = new IsAlterTableDoneRequest(
-        this.masterTable, table);
-    request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    if (parent != null) {
-      request.setParentRpc(parent);
-    }
+    IsAlterTableDoneRequest request = new IsAlterTableDoneRequest(this.masterTable,
+                                                                  table,
+                                                                  timer,
+                                                                  defaultAdminOperationTimeoutMs);
+    request.setParentRpc(parent);
     return sendRpcToTablet(request);
   }
 
@@ -720,8 +739,9 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   public Deferred<ListTabletServersResponse> listTabletServers() {
     checkIsClosed();
-    ListTabletServersRequest rpc = new ListTabletServersRequest(this.masterTable);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    ListTabletServersRequest rpc = new ListTabletServersRequest(this.masterTable,
+                                                                timer,
+                                                                defaultAdminOperationTimeoutMs);
     return sendRpcToTablet(rpc);
   }
 
@@ -740,13 +760,13 @@ public class AsyncKuduClient implements AutoCloseable {
     Preconditions.checkNotNull(tableName);
 
     // Prefer a lookup by table ID over name, since the former is immutable.
-    GetTableSchemaRequest rpc = new GetTableSchemaRequest(
-        this.masterTable, tableId, tableId != null ? null : tableName);
+    GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable,
+                                                          tableId,
+                                                          tableId != null ? null : tableName,
+                                                          timer,
+                                                          defaultAdminOperationTimeoutMs);
 
-    if (parent != null) {
-      rpc.setParentRpc(parent);
-    }
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    rpc.setParentRpc(parent);
     return sendRpcToTablet(rpc).addCallback(new Callback<KuduTable, GetTableSchemaResponse>() {
       @Override
       public KuduTable call(GetTableSchemaResponse resp) throws Exception {
@@ -784,8 +804,10 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return a deferred that yields the list of table names
    */
   public Deferred<ListTablesResponse> getTablesList(String nameFilter) {
-    ListTablesRequest rpc = new ListTablesRequest(this.masterTable, nameFilter);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    ListTablesRequest rpc = new ListTablesRequest(this.masterTable,
+                                                  nameFilter,
+                                                  timer,
+                                                  defaultAdminOperationTimeoutMs);
     return sendRpcToTablet(rpc);
   }
 
@@ -970,7 +992,7 @@ public class AsyncKuduClient implements AutoCloseable {
               RpcTraceFrame.Action.SLEEP_THEN_RETRY)
           .callStatus(ex.getStatus())
           .build());
-      newTimeout(retryTask, sleepTime);
+      newTimeout(timer, retryTask, sleepTime);
       return null;
 
       // fakeRpc.Deferred was not invoked; the user continues to wait until
@@ -1347,12 +1369,14 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param method fake RPC method (shows up in RPC traces)
    * @param parent parent RPC (for tracing), if any
    * @param <R> the expected return type of the fake RPC
+   * @param timeoutMs the timeout in milliseconds for the fake RPC
    * @return created fake RPC
    */
   private <R> KuduRpc<R> buildFakeRpc(
       @Nonnull final String method,
-      @Nullable final KuduRpc<?> parent) {
-    KuduRpc<R> rpc = new KuduRpc<R>(null) {
+      @Nullable final KuduRpc<?> parent,
+      long timeoutMs) {
+    KuduRpc<R> rpc = new KuduRpc<R>(null, timer, timeoutMs) {
       @Override
       Message createRequestPB() {
         return null;
@@ -1374,14 +1398,25 @@ public class AsyncKuduClient implements AutoCloseable {
         return null;
       }
     };
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    if (parent != null) {
-      rpc.setParentRpc(parent);
-    }
+    rpc.setParentRpc(parent);
     return rpc;
   }
 
   /**
+   * Creates an RPC that will never be sent, and will instead be used
+   * exclusively for timeouts.
+   * @param method fake RPC method (shows up in RPC traces)
+   * @param parent parent RPC (for tracing), if any
+   * @param <R> the expected return type of the fake RPC
+   * @return created fake RPC
+   */
+  private <R> KuduRpc<R> buildFakeRpc(
+      @Nonnull final String method,
+      @Nullable final KuduRpc<?> parent) {
+    return buildFakeRpc(method, parent, defaultAdminOperationTimeoutMs);
+  }
+
+  /**
    * Schedules a IsAlterTableDone RPC. When the response comes in, if the table
    * is done altering, the RPC's callback chain is triggered with 'resp' as its
    * value. If not, another IsAlterTableDone RPC is scheduled and the cycle
@@ -1550,7 +1585,7 @@ public class AsyncKuduClient implements AutoCloseable {
       tooManyAttemptsOrTimeout(rpc, null);
       return;
     }
-    newTimeout(new RetryTimer(), sleepTimeMillis);
+    newTimeout(timer, new RetryTimer(), sleepTimeMillis);
   }
 
   /**
@@ -1580,7 +1615,7 @@ public class AsyncKuduClient implements AutoCloseable {
       tooManyAttemptsOrTimeout(rpc, null);
       return;
     }
-    newTimeout(new RetryTimer(), sleepTimeMillis);
+    newTimeout(timer, new RetryTimer(), sleepTimeMillis);
   }
 
   private final class ReleaseMasterLookupPermit<T> implements Callback<T, T> {
@@ -1688,15 +1723,18 @@ public class AsyncKuduClient implements AutoCloseable {
     if (isMasterTable(tableId)) {
       d = getMasterTableLocationsPB(parentRpc);
     } else {
+      long timeoutMillis = parentRpc == null ? defaultAdminOperationTimeoutMs :
+                                               parentRpc.deadlineTracker.getMillisBeforeDeadline();
       // Leave the end of the partition key range empty in order to pre-fetch tablet locations.
       GetTableLocationsRequest rpc =
-          new GetTableLocationsRequest(masterTable, partitionKey, null, tableId, fetchBatchSize);
-      if (parentRpc != null) {
-        rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
-        rpc.setParentRpc(parentRpc);
-      } else {
-        rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-      }
+          new GetTableLocationsRequest(masterTable,
+                                       partitionKey,
+                                       null,
+                                       tableId,
+                                       fetchBatchSize,
+                                       timer,
+                                       timeoutMillis);
+      rpc.setParentRpc(parentRpc);
       d = sendRpcToTablet(rpc);
     }
     d.addCallback(new MasterLookupCB(table, partitionKey, fetchBatchSize));
@@ -1823,15 +1861,20 @@ public class AsyncKuduClient implements AutoCloseable {
       final byte[] lookupKey = partitionKey;
 
       // Build a fake RPC to encapsulate and propagate the timeout. There's no actual "RPC" to send.
-      KuduRpc fakeRpc = buildFakeRpc("loopLocateTable", null);
-      fakeRpc.setTimeoutMillis(deadlineTracker.getMillisBeforeDeadline());
+      KuduRpc fakeRpc = buildFakeRpc("loopLocateTable",
+                                     null,
+                                     deadlineTracker.getMillisBeforeDeadline());
 
       return locateTablet(table, key, fetchBatchSize, fakeRpc).addCallbackDeferring(
           new Callback<Deferred<List<LocatedTablet>>, GetTableLocationsResponsePB>() {
             @Override
             public Deferred<List<LocatedTablet>> call(GetTableLocationsResponsePB resp) {
-              return loopLocateTable(table, lookupKey, endPartitionKey, fetchBatchSize,
-                                     ret, deadlineTracker);
+              return loopLocateTable(table,
+                                     lookupKey,
+                                     endPartitionKey,
+                                     fetchBatchSize,
+                                     ret,
+                                     deadlineTracker);
             }
 
             @Override
@@ -1865,8 +1908,12 @@ public class AsyncKuduClient implements AutoCloseable {
     final List<LocatedTablet> ret = Lists.newArrayList();
     final DeadlineTracker deadlineTracker = new DeadlineTracker();
     deadlineTracker.setDeadline(deadline);
-    return loopLocateTable(table, startPartitionKey, endPartitionKey, fetchBatchSize,
-                           ret, deadlineTracker);
+    return loopLocateTable(table,
+                           startPartitionKey,
+                           endPartitionKey,
+                           fetchBatchSize,
+                           ret,
+                           deadlineTracker);
   }
 
   /**
@@ -1952,11 +1999,12 @@ public class AsyncKuduClient implements AutoCloseable {
             .build());
 
     long sleepTime = getSleepTimeForRpcMillis(rpc);
-    if (cannotRetryRequest(rpc) || rpc.deadlineTracker.wouldSleepingTimeoutMillis(sleepTime)) {
+    if (cannotRetryRequest(rpc) ||
+        rpc.deadlineTracker.wouldSleepingTimeoutMillis(sleepTime)) {
       // Don't let it retry.
       return tooManyAttemptsOrTimeout(rpc, ex);
     }
-    newTimeout(new RetryTimer(), sleepTime);
+    newTimeout(timer, new RetryTimer(), sleepTime);
     return rpc.getDeferred();
   }
 
@@ -2345,16 +2393,29 @@ public class AsyncKuduClient implements AutoCloseable {
     return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
   }
 
-  void newTimeout(final TimerTask task, final long timeoutMs) {
+  /**
+   * Utility function to register a timeout task 'task' on timer 'timer' that
+   * will fire after 'timeoutMillis' milliseconds. Returns a handle to the
+   * scheduled timeout, which can be used to cancel the task and release its
+   * resources.
+   * @param timer the timer on which the task is scheduled
+   * @param task the task that will be run when the timeout hits
+   * @param timeoutMillis the timeout, in milliseconds
+   * @return a handle to the scheduled timeout
+   */
+  static Timeout newTimeout(final Timer timer,
+                            final TimerTask task,
+                            final long timeoutMillis) {
+    Preconditions.checkNotNull(timer);
     try {
-      timer.newTimeout(task, timeoutMs, MILLISECONDS);
+      return timer.newTimeout(task, timeoutMillis, MILLISECONDS);
     } catch (IllegalStateException e) {
       // This can happen if the timer fires just before shutdown()
       // is called from another thread, and due to how threads get
       // scheduled we tried to call newTimeout() after timer.stop().
-      LOG.warn("Failed to schedule timer." +
-          " Ignore this if we're shutting down.", e);
+      LOG.warn("Failed to schedule timer. Ignore this if we're shutting down.", e);
     }
+    return null;
   }
 
   /**
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 36bb0af..d31c0a2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -835,9 +835,8 @@ public final class AsyncKuduScanner {
   final class KeepAliveRequest extends KuduRpc<Void> {
 
     KeepAliveRequest(KuduTable table, RemoteTablet tablet) {
-      super(table);
+      super(table, client.getTimer(), scanRequestTimeout);
       setTablet(tablet);
-      this.setTimeoutMillis(scanRequestTimeout);
     }
 
     @Override
@@ -882,10 +881,9 @@ public final class AsyncKuduScanner {
     State state;
 
     ScanRequest(KuduTable table, State state, RemoteTablet tablet) {
-      super(table);
+      super(table, client.getTimer(), scanRequestTimeout);
       setTablet(tablet);
       this.state = state;
-      this.setTimeoutMillis(scanRequestTimeout);
     }
 
     @Override
@@ -1017,8 +1015,7 @@ public final class AsyncKuduScanner {
         }
       }
       RowResultIterator iterator = RowResultIterator.makeRowResultIterator(
-          deadlineTracker.getElapsedMillis(), tsUUID, schema, resp.getData(),
-          callResponse);
+          deadlineTracker.getElapsedMillis(), tsUUID, schema, resp.getData(), callResponse);
 
       boolean hasMore = resp.getHasMoreResults();
       if (id.length != 0 && scannerId != null && !Bytes.equals(scannerId, id)) {
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index d815fdb..0e174f7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -126,7 +126,7 @@ public class AsyncKuduSession implements SessionConfiguration {
   private int mutationBufferLowWatermark;
   private FlushMode flushMode;
   private ExternalConsistencyMode consistencyMode;
-  private long timeoutMs;
+  private long timeoutMillis;
 
   /**
    * Protects internal state from concurrent access. {@code AsyncKuduSession} is not threadsafe
@@ -188,7 +188,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     this.client = client;
     flushMode = FlushMode.AUTO_FLUSH_SYNC;
     consistencyMode = CLIENT_PROPAGATED;
-    timeoutMs = client.getDefaultOperationTimeoutMs();
+    timeoutMillis = client.getDefaultOperationTimeoutMs();
     inactiveBuffers.add(bufferA);
     inactiveBuffers.add(bufferB);
     errorCollector = new ErrorCollector(mutationBufferSpace);
@@ -257,12 +257,12 @@ public class AsyncKuduSession implements SessionConfiguration {
 
   @Override
   public void setTimeoutMillis(long timeout) {
-    this.timeoutMs = timeout;
+    this.timeoutMillis = timeout;
   }
 
   @Override
   public long getTimeoutMillis() {
-    return this.timeoutMs;
+    return this.timeoutMillis;
   }
 
   @Override
@@ -390,9 +390,8 @@ public class AsyncKuduSession implements SessionConfiguration {
       }
 
       for (Batch batch : batches.values()) {
-        if (timeoutMs != 0) {
-          batch.deadlineTracker.reset();
-          batch.setTimeoutMillis(timeoutMs);
+        if (timeoutMillis != 0) {
+          batch.resetTimeoutMillis(client.getTimer(), timeoutMillis);
         }
         addBatchCallbacks(batch);
         batchResponses.add(client.sendRpcToTablet(batch));
@@ -544,8 +543,8 @@ public class AsyncKuduSession implements SessionConfiguration {
 
     // If immediate flush mode, send the operation directly.
     if (flushMode == FlushMode.AUTO_FLUSH_SYNC) {
-      if (timeoutMs != 0) {
-        operation.setTimeoutMillis(timeoutMs);
+      if (timeoutMillis != 0) {
+        operation.resetTimeoutMillis(client.getTimer(), timeoutMillis);
       }
       operation.setExternalConsistencyMode(this.consistencyMode);
       operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
@@ -568,7 +567,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     Deferred<LocatedTablet> tablet = client.getTabletLocation(operation.getTable(),
                                                               operation.partitionKey(),
                                                               LookupType.POINT,
-                                                              timeoutMs);
+                                                              timeoutMillis);
 
     // Holds a buffer that should be flushed outside the synchronized block, if necessary.
     Buffer fullBuffer = null;
@@ -646,7 +645,7 @@ public class AsyncKuduSession implements SessionConfiguration {
             activeBufferSize = 0;
           } else if (activeBufferSize == 0) {
             // If this is the first operation in the buffer, start a background flush timer.
-            client.newTimeout(activeBuffer.getFlusherTask(), interval);
+            AsyncKuduClient.newTimeout(client.getTimer(), activeBuffer.getFlusherTask(), interval);
           }
         }
       }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index d2034e2..a89374e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.WireProtocol;
 import org.apache.kudu.client.Statistics.Statistic;
@@ -57,14 +58,30 @@ class Batch extends KuduRpc<BatchResponse> {
   /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
   private final boolean ignoreAllDuplicateRows;
 
-
   Batch(KuduTable table, LocatedTablet tablet, boolean ignoreAllDuplicateRows) {
-    super(table);
+    super(table, null, 0);
     this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
     this.tablet = tablet;
   }
 
   /**
+   * Reset the timeout of this batch.
+   *
+   * TODO(wdberkeley): The fact we have to do this is a sign an Operation should not subclass
+   * KuduRpc.
+   *
+   * @param timeoutMillis the new timeout of the batch in milliseconds
+   */
+  void resetTimeoutMillis(Timer timer, long timeoutMillis) {
+    deadlineTracker.reset();
+    deadlineTracker.setDeadline(timeoutMillis);
+    if (timeoutTask != null) {
+      timeoutTask.cancel();
+    }
+    timeoutTask = AsyncKuduClient.newTimeout(timer, new RpcTimeoutTask(), timeoutMillis);
+  }
+
+  /**
    * Returns the bytes size of this batch's row operations after serialization.
    * @return size in bytes
    * @throws IllegalStateException thrown if this RPC hasn't been serialized eg sent to a TS
@@ -128,8 +145,11 @@ class Batch extends KuduRpc<BatchResponse> {
       }
     }
 
-    BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(), tsUUID,
-                                               builder.getTimestamp(), errorsPB, operations,
+    BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(),
+                                               tsUUID,
+                                               builder.getTimestamp(),
+                                               errorsPB,
+                                               operations,
                                                operationIndexes);
 
     if (injectedError != null) {
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
index a426ac4..408c4cd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
@@ -46,9 +46,12 @@ public class BatchResponse extends KuduRpcResponse {
    * @param operations the list of operations which created this response
    * @param indexes the list of operations' order index
    */
-  BatchResponse(long elapsedMillis, String tsUUID, long writeTimestamp,
+  BatchResponse(long elapsedMillis,
+                String tsUUID,
+                long writeTimestamp,
                 List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB,
-                List<Operation> operations, List<Integer> indexes) {
+                List<Operation> operations,
+                List<Integer> indexes) {
     super(elapsedMillis, tsUUID);
     this.writeTimestamp = writeTimestamp;
     individualResponses = new ArrayList<>(operations.size());
@@ -75,8 +78,11 @@ public class BatchResponse extends KuduRpcResponse {
         currentErrorIndex++;
       }
       individualResponses.add(
-          new OperationResponse(currentOperation.deadlineTracker.getElapsedMillis(), tsUUID,
-              writeTimestamp, currentOperation, rowError));
+          new OperationResponse(currentOperation.deadlineTracker.getElapsedMillis(),
+                                tsUUID,
+                                writeTimestamp,
+                                currentOperation,
+                                rowError));
     }
     assert (rowErrors.size() == errorsPB.size());
     assert (individualResponses.size() == operations.size());
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index aecb51f..1ee9767 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -33,6 +33,8 @@ import com.stumbleupon.async.Deferred;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.jboss.netty.util.Timer;
+
 import org.apache.kudu.Common.HostPortPB;
 import org.apache.kudu.consensus.Metadata.RaftPeerPB.Role;
 import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
@@ -93,16 +95,14 @@ final class ConnectToCluster {
       final KuduTable masterTable,
       final RpcProxy masterProxy,
       KuduRpc<?> parentRpc,
+      Timer timer,
       long defaultTimeoutMs) {
     // TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
     // basically reuse in some way the master permits.
-    final ConnectToMasterRequest rpc = new ConnectToMasterRequest(masterTable);
-    if (parentRpc != null) {
-      rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
-      rpc.setParentRpc(parentRpc);
-    } else {
-      rpc.setTimeoutMillis(defaultTimeoutMs);
-    }
+    long timeoutMillis = parentRpc == null ? defaultTimeoutMs :
+                                             parentRpc.deadlineTracker.getMillisBeforeDeadline();
+    final ConnectToMasterRequest rpc = new ConnectToMasterRequest(masterTable, timer, timeoutMillis);
+    rpc.setParentRpc(parentRpc);
     Deferred<ConnectToMasterResponsePB> d = rpc.getDeferred();
     rpc.attempt++;
     masterProxy.sendRpc(rpc);
@@ -168,10 +168,10 @@ final class ConnectToCluster {
     List<Deferred<ConnectToMasterResponsePB>> deferreds = new ArrayList<>();
     for (HostAndPort hostAndPort : masterAddrs) {
       Deferred<ConnectToMasterResponsePB> d;
-      RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(
-          hostAndPort, credentialsPolicy);
+      AsyncKuduClient client = masterTable.getAsyncClient();
+      RpcProxy proxy = client.newMasterRpcProxy(hostAndPort, credentialsPolicy);
       if (proxy != null) {
-        d = connectToMaster(masterTable, proxy, parentRpc, defaultTimeoutMs);
+        d = connectToMaster(masterTable, proxy, parentRpc, client.getTimer(), defaultTimeoutMs);
       } else {
         String message = "Couldn't resolve this master's address " + hostAndPort.toString();
         LOG.warn(message);
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
index cf0a579..f4e2769 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
 import org.apache.kudu.master.Master.MasterFeatures;
@@ -51,8 +52,10 @@ public class ConnectToMasterRequest extends KuduRpc<ConnectToMasterResponsePB> {
    */
   private String method = CONNECT_TO_MASTER;
 
-  public ConnectToMasterRequest(KuduTable masterTable) {
-    super(masterTable);
+  public ConnectToMasterRequest(KuduTable masterTable,
+                                Timer timer,
+                                long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     // TODO(todd): get rid of 'masterTable' hack
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
index d5616b5..f1d5f20 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.master.Master;
@@ -40,9 +41,13 @@ class CreateTableRequest extends KuduRpc<CreateTableResponse> {
   private final Master.CreateTableRequestPB.Builder builder;
   private final List<Integer> featureFlags;
 
-  CreateTableRequest(KuduTable masterTable, String name, Schema schema,
-                     CreateTableOptions builder) {
-    super(masterTable);
+  CreateTableRequest(KuduTable masterTable,
+                     String name,
+                     Schema schema,
+                     CreateTableOptions builder,
+                     Timer timer,
+                     long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     this.schema = schema;
     this.name = name;
     this.builder = builder.getBuilder();
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableResponse.java
index 6f4427f..8cf41bc 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableResponse.java
@@ -24,10 +24,10 @@ public class CreateTableResponse extends KuduRpcResponse {
   private final String tableId;
 
   /**
-   * @param ellapsedMillis Time in milliseconds since RPC creation to now.
+   * @param elapsedMillis Time in milliseconds since RPC creation to now.
    */
-  CreateTableResponse(long ellapsedMillis, String tsUUID, String tableId) {
-    super(ellapsedMillis, tsUUID);
+  CreateTableResponse(long elapsedMillis, String tsUUID, String tableId) {
+    super(elapsedMillis, tsUUID);
     this.tableId = tableId;
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
index b7e9441..80c207d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
@@ -33,8 +34,11 @@ class DeleteTableRequest extends KuduRpc<DeleteTableResponse> {
 
   private final String name;
 
-  DeleteTableRequest(KuduTable table, String name) {
-    super(table);
+  DeleteTableRequest(KuduTable table,
+                     String name,
+                     Timer timer,
+                     long timeoutMillis) {
+    super(table, timer, timeoutMillis);
     this.name = name;
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableResponse.java
index a99d68a..403ef41 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableResponse.java
@@ -25,9 +25,9 @@ import org.apache.yetus.audience.InterfaceStability;
 public class DeleteTableResponse extends KuduRpcResponse {
 
   /**
-   * @param ellapsedMillis Time in milliseconds since RPC creation to now.
+   * @param elapsedMillis Time in milliseconds since RPC creation to now.
    */
-  DeleteTableResponse(long ellapsedMillis, String tsUUID) {
-    super(ellapsedMillis, tsUUID);
+  DeleteTableResponse(long elapsedMillis, String tsUUID) {
+    super(elapsedMillis, tsUUID);
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
index 796c3db..a7fe825 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
@@ -21,6 +21,7 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
@@ -36,10 +37,14 @@ class GetTableLocationsRequest extends KuduRpc<Master.GetTableLocationsResponseP
   private final String tableId;
   private final int maxReturnedLocations;
 
-  GetTableLocationsRequest(KuduTable table, byte[] startPartitionKey,
-                           byte[] endPartitionKey, String tableId,
-                           int maxReturnedLocations) {
-    super(table);
+  GetTableLocationsRequest(KuduTable table,
+                           byte[] startPartitionKey,
+                           byte[] endPartitionKey,
+                           String tableId,
+                           int maxReturnedLocations,
+                           Timer timer,
+                           long timeoutMillis) {
+    super(table, timer, timeoutMillis);
     if (startPartitionKey != null && endPartitionKey != null &&
         Bytes.memcmp(startPartitionKey, endPartitionKey) > 0) {
       throw new IllegalArgumentException(
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
index ec4afef..93671bf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.master.Master.TableIdentifierPB.Builder;
@@ -39,8 +40,12 @@ public class GetTableSchemaRequest extends KuduRpc<GetTableSchemaResponse> {
   private final String name;
 
 
-  GetTableSchemaRequest(KuduTable masterTable, String id, String name) {
-    super(masterTable);
+  GetTableSchemaRequest(KuduTable masterTable,
+                        String id,
+                        String name,
+                        Timer timer,
+                        long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     Preconditions.checkArgument(id != null ^ name != null,
         "Only one of table ID or the table name should be provided");
     this.id = id;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
index a426768..a3d10ab 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
@@ -30,20 +30,20 @@ public class GetTableSchemaResponse extends KuduRpcResponse {
   private final int numReplicas;
 
   /**
-   * @param ellapsedMillis Time in milliseconds since RPC creation to now
+   * @param elapsedMillis Time in milliseconds since RPC creation to now
    * @param tsUUID the UUID of the tablet server that sent the response
    * @param schema the table's schema
    * @param tableId the UUID of the table in the response
    * @param numReplicas the table's replication factor
    * @param partitionSchema the table's partition schema
    */
-  GetTableSchemaResponse(long ellapsedMillis,
+  GetTableSchemaResponse(long elapsedMillis,
                          String tsUUID,
                          Schema schema,
                          String tableId,
                          int numReplicas,
                          PartitionSchema partitionSchema) {
-    super(ellapsedMillis, tsUUID);
+    super(elapsedMillis, tsUUID);
     this.schema = schema;
     this.partitionSchema = partitionSchema;
     this.tableId = tableId;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
index 73c4972..2866faf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
@@ -23,6 +23,7 @@ import static org.apache.kudu.master.Master.TableIdentifierPB;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.util.Pair;
 
@@ -33,8 +34,11 @@ import org.apache.kudu.util.Pair;
 class IsAlterTableDoneRequest extends KuduRpc<IsAlterTableDoneResponse> {
   private final TableIdentifierPB.Builder tableId;
 
-  IsAlterTableDoneRequest(KuduTable masterTable, TableIdentifierPB.Builder tableId) {
-    super(masterTable);
+  IsAlterTableDoneRequest(KuduTable masterTable,
+                          TableIdentifierPB.Builder tableId,
+                          Timer timer,
+                          long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     this.tableId = tableId;
   }
 
@@ -62,7 +66,8 @@ class IsAlterTableDoneRequest extends KuduRpc<IsAlterTableDoneResponse> {
     final IsAlterTableDoneResponsePB.Builder respBuilder = IsAlterTableDoneResponsePB.newBuilder();
     readProtobuf(callResponse.getPBMessage(), respBuilder);
     IsAlterTableDoneResponse resp = new IsAlterTableDoneResponse(deadlineTracker.getElapsedMillis(),
-        tsUUID, respBuilder.getDone());
+                                                                 tsUUID,
+                                                                 respBuilder.getDone());
     return new Pair<IsAlterTableDoneResponse, Object>(
         resp, respBuilder.hasError() ? respBuilder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
index c08360b..2fd0290 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
@@ -19,6 +19,8 @@ package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
+
 import org.apache.kudu.master.Master.IsCreateTableDoneRequestPB;
 import org.apache.kudu.master.Master.IsCreateTableDoneResponsePB;
 import org.apache.kudu.master.Master.TableIdentifierPB;
@@ -31,8 +33,11 @@ import org.apache.kudu.util.Pair;
 class IsCreateTableDoneRequest extends KuduRpc<IsCreateTableDoneResponse> {
   private final TableIdentifierPB.Builder tableId;
 
-  IsCreateTableDoneRequest(KuduTable masterTable, TableIdentifierPB.Builder tableId) {
-    super(masterTable);
+  IsCreateTableDoneRequest(KuduTable masterTable,
+                           TableIdentifierPB.Builder tableId,
+                           Timer timer,
+                           long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     this.tableId = tableId;
   }
 
@@ -54,7 +59,8 @@ class IsCreateTableDoneRequest extends KuduRpc<IsCreateTableDoneResponse> {
     readProtobuf(callResponse.getPBMessage(), builder);
     IsCreateTableDoneResponse resp =
         new IsCreateTableDoneResponse(deadlineTracker.getElapsedMillis(),
-        tsUUID, builder.getDone());
+                                      tsUUID,
+                                      builder.getDone());
     return new Pair<IsCreateTableDoneResponse, Object>(
         resp, builder.hasError() ? builder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 8fc011a..3d212ee 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -43,6 +43,9 @@ import com.stumbleupon.async.Deferred;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.Timer;
+import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,8 +132,12 @@ public abstract class KuduRpc<R> {
 
   final DeadlineTracker deadlineTracker;
 
-  protected long propagatedTimestamp = -1;
-  protected ExternalConsistencyMode externalConsistencyMode = CLIENT_PROPAGATED;
+  // 'timeoutTask' is a handle to the timer task that will time out the RPC. It is
+  // null if and only if the task has no timeout.
+  Timeout timeoutTask;
+
+  long propagatedTimestamp = -1;
+  ExternalConsistencyMode externalConsistencyMode = CLIENT_PROPAGATED;
 
   /**
    * How many times have we retried this RPC?.
@@ -146,9 +153,15 @@ public abstract class KuduRpc<R> {
    */
   private long sequenceId = RequestTracker.NO_SEQ_NO;
 
-  KuduRpc(KuduTable table) {
+  KuduRpc(KuduTable table, Timer timer, long timeoutMillis) {
     this.table = table;
     this.deadlineTracker = new DeadlineTracker();
+    deadlineTracker.setDeadline(timeoutMillis);
+    if (timer != null) {
+      this.timeoutTask = AsyncKuduClient.newTimeout(timer,
+                                                    new RpcTimeoutTask(),
+                                                    timeoutMillis);
+    }
   }
 
   /**
@@ -241,6 +254,9 @@ public abstract class KuduRpc<R> {
       table.getAsyncClient().getRequestTracker().rpcCompleted(sequenceId);
       sequenceId = RequestTracker.NO_SEQ_NO;
     }
+    if (timeoutTask != null) {
+      timeoutTask.cancel();
+    }
     deadlineTracker.reset();
     traces.clear();
     parentRpc = null;
@@ -276,8 +292,8 @@ public abstract class KuduRpc<R> {
    * @param parentRpc RPC that will also receive traces from this RPC
    */
   void setParentRpc(KuduRpc<?> parentRpc) {
-    assert (this.parentRpc == null);
-    assert (this.parentRpc != this);
+    assert(this.parentRpc == null);
+    assert(this != parentRpc);
     this.parentRpc = parentRpc;
   }
 
@@ -325,10 +341,6 @@ public abstract class KuduRpc<R> {
     return table;
   }
 
-  void setTimeoutMillis(long timeout) {
-    deadlineTracker.setDeadline(timeout);
-  }
-
   /**
    * If this RPC needs to be tracked on the client and server-side. Some RPCs require exactly-once
    * semantics which is enabled by tracking them.
@@ -420,4 +432,15 @@ public abstract class KuduRpc<R> {
     chanBuf.writerIndex(buf.length);
     return chanBuf;
   }
+
+  /**
+   * A netty TimerTask for timing out a KuduRpc.
+   */
+  final class RpcTimeoutTask implements TimerTask {
+    @Override
+    public void run(final Timeout timeout) {
+      Status statusTimedOut = Status.TimedOut("can not complete before timeout: " + KuduRpc.this);
+      KuduRpc.this.errback(new NonRecoverableException(statusTimedOut));
+    }
+  }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
index 8fe19a9..e7416df 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
@@ -31,8 +32,11 @@ class ListTablesRequest extends KuduRpc<ListTablesResponse> {
 
   private final String nameFilter;
 
-  ListTablesRequest(KuduTable masterTable, String nameFilter) {
-    super(masterTable);
+  ListTablesRequest(KuduTable masterTable,
+                    String nameFilter,
+                    Timer timer,
+                    long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     this.nameFilter = nameFilter;
   }
 
@@ -68,7 +72,8 @@ class ListTablesRequest extends KuduRpc<ListTablesResponse> {
       tables.add(info.getName());
     }
     ListTablesResponse response = new ListTablesResponse(deadlineTracker.getElapsedMillis(),
-                                                         tsUUID, tables);
+                                                         tsUUID,
+                                                         tables);
     return new Pair<ListTablesResponse, Object>(
         response, respBuilder.hasError() ? respBuilder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java
index ca851c4..d7d14e2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java
@@ -28,8 +28,8 @@ public class ListTablesResponse extends KuduRpcResponse {
 
   private final List<String> tablesList;
 
-  ListTablesResponse(long ellapsedMillis, String tsUUID, List<String> tablesList) {
-    super(ellapsedMillis, tsUUID);
+  ListTablesResponse(long elapsedMillis, String tsUUID, List<String> tablesList) {
+    super(elapsedMillis, tsUUID);
     this.tablesList = tablesList;
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
index c453354..75e62fb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
@@ -25,14 +25,17 @@ import java.util.List;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.util.Pair;
 
 @InterfaceAudience.Private
 public class ListTabletServersRequest extends KuduRpc<ListTabletServersResponse> {
 
-  public ListTabletServersRequest(KuduTable masterTable) {
-    super(masterTable);
+  public ListTabletServersRequest(KuduTable masterTable,
+                                  Timer timer,
+                                  long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
   }
 
   @Override
@@ -61,8 +64,11 @@ public class ListTabletServersRequest extends KuduRpc<ListTabletServersResponse>
     for (ListTabletServersResponsePB.Entry entry : respBuilder.getServersList()) {
       servers.add(entry.getRegistration().getRpcAddresses(0).getHost());
     }
-    ListTabletServersResponse response = new ListTabletServersResponse(deadlineTracker
-        .getElapsedMillis(), tsUUID, serversCount, servers);
+    ListTabletServersResponse response =
+        new ListTabletServersResponse(deadlineTracker.getElapsedMillis(),
+                                      tsUUID,
+                                      serversCount,
+                                      servers);
     return new Pair<ListTabletServersResponse, Object>(
         response, respBuilder.hasError() ? respBuilder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersResponse.java
index 1937117..fc979e4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersResponse.java
@@ -30,13 +30,13 @@ public class ListTabletServersResponse extends KuduRpcResponse {
   private final List<String> tabletServersList;
 
   /**
-   * @param ellapsedMillis Time in milliseconds since RPC creation to now.
+   * @param elapsedMillis Time in milliseconds since RPC creation to now.
    * @param tabletServersCount How many tablet servers the master is reporting.
    * @param tabletServersList List of tablet servers.
    */
-  ListTabletServersResponse(long ellapsedMillis, String tsUUID,
+  ListTabletServersResponse(long elapsedMillis, String tsUUID,
                             int tabletServersCount, List<String> tabletServersList) {
-    super(ellapsedMillis, tsUUID);
+    super(elapsedMillis, tsUUID);
     this.tabletServersCount = tabletServersCount;
     this.tabletServersList = tabletServersList;
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
index f712f3b..a6d4ff3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.tserver.Tserver;
 import org.apache.kudu.util.Pair;
@@ -29,8 +30,8 @@ import org.apache.kudu.util.Pair;
 @InterfaceAudience.Private
 class ListTabletsRequest extends KuduRpc<ListTabletsResponse> {
 
-  ListTabletsRequest() {
-    super(null);
+  ListTabletsRequest(Timer timer, long timeoutMillis) {
+    super(null, timer, timeoutMillis);
   }
 
   @Override
@@ -61,7 +62,8 @@ class ListTabletsRequest extends KuduRpc<ListTabletsResponse> {
       tablets.add(info.getTabletStatus().getTabletId());
     }
     ListTabletsResponse response = new ListTabletsResponse(deadlineTracker.getElapsedMillis(),
-                                                         tsUUID, tablets);
+                                                           tsUUID,
+                                                           tablets);
     return new Pair<ListTabletsResponse, Object>(
         response, respBuilder.hasError() ? respBuilder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsResponse.java
index 1f19778..4ad6b7d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsResponse.java
@@ -26,8 +26,8 @@ public class ListTabletsResponse extends KuduRpcResponse {
 
   private final List<String> tabletsList;
 
-  ListTabletsResponse(long ellapsedMillis, String tsUUID, List<String> tabletsList) {
-    super(ellapsedMillis, tsUUID);
+  ListTabletsResponse(long elapsedMillis, String tsUUID, List<String> tabletsList) {
+    super(elapsedMillis, tsUUID);
     this.tabletsList = tabletsList;
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index c0a83af..a585c72 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -29,6 +29,7 @@ import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
@@ -93,10 +94,27 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
    * @param table table with the schema to use for this operation
    */
   Operation(KuduTable table) {
-    super(table);
+    super(table, null, 0);
     this.row = table.getSchema().newPartialRow();
   }
 
+  /**
+   * Reset the timeout of this batch.
+   *
+   * TODO(wdberkeley): The fact we have to do this is a sign an Operation should not subclass
+   * KuduRpc.
+   *
+   * @param timeoutMillis the new timeout of the batch in milliseconds
+   */
+  void resetTimeoutMillis(Timer timer, long timeoutMillis) {
+    deadlineTracker.reset();
+    deadlineTracker.setDeadline(timeoutMillis);
+    if (timeoutTask != null) {
+      timeoutTask.cancel();
+    }
+    timeoutTask = AsyncKuduClient.newTimeout(timer, new RpcTimeoutTask(), timeoutMillis);
+  }
+
   /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
   void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows) {
     this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
@@ -157,8 +175,11 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
         error = null;
       }
     }
-    OperationResponse response = new OperationResponse(deadlineTracker.getElapsedMillis(), tsUUID,
-                                                       builder.getTimestamp(), this, error);
+    OperationResponse response = new OperationResponse(deadlineTracker.getElapsedMillis(),
+                                                       tsUUID,
+                                                       builder.getTimestamp(),
+                                                       this,
+                                                       error);
     return new Pair<OperationResponse, Object>(
         response, builder.hasError() ? builder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
index 146765f..d4234ac 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
@@ -40,8 +40,11 @@ public class OperationResponse extends KuduRpcResponse {
    * @param operation the operation that created this response
    * @param errorPB a row error in pb format, can be null
    */
-  OperationResponse(long elapsedMillis, String tsUUID, long writeTimestamp,
-                    Operation operation, Tserver.WriteResponsePB.PerRowErrorPB errorPB) {
+  OperationResponse(long elapsedMillis,
+                    String tsUUID,
+                    long writeTimestamp,
+                    Operation operation,
+                    Tserver.WriteResponsePB.PerRowErrorPB errorPB) {
     super(elapsedMillis, tsUUID);
     this.writeTimestamp = writeTimestamp;
     this.rowError = errorPB == null ? null : RowError.fromRowErrorPb(errorPB, operation, tsUUID);
@@ -55,8 +58,11 @@ public class OperationResponse extends KuduRpcResponse {
    * @param operation the operation that created this response
    * @param rowError a parsed row error, can be null
    */
-  OperationResponse(long elapsedMillis, String tsUUID, long writeTimestamp,
-                    Operation operation, RowError rowError) {
+  OperationResponse(long elapsedMillis,
+                    String tsUUID,
+                    long writeTimestamp,
+                    Operation operation,
+                    RowError rowError) {
     super(elapsedMillis, tsUUID);
     this.writeTimestamp = writeTimestamp;
     this.rowError = rowError;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
index 20272c1..d536998 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
@@ -20,6 +20,7 @@ package org.apache.kudu.client;
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
@@ -34,15 +35,15 @@ class PingRequest extends KuduRpc<PingResponse> {
   private final String serviceName;
 
   static PingRequest makeMasterPingRequest() {
-    return new PingRequest(MASTER_SERVICE_NAME);
+    return new PingRequest(MASTER_SERVICE_NAME, null, 0);
   }
 
   static PingRequest makeTabletServerPingRequest() {
-    return new PingRequest(TABLET_SERVER_SERVICE_NAME);
+    return new PingRequest(TABLET_SERVER_SERVICE_NAME, null, 0);
   }
 
-  private PingRequest(String serviceName) {
-    super(null);
+  private PingRequest(String serviceName, Timer timer, long timeoutMillis) {
+    super(null, timer, timeoutMillis);
     this.serviceName = serviceName;
   }
 
@@ -67,8 +68,7 @@ class PingRequest extends KuduRpc<PingResponse> {
     final Master.PingResponsePB.Builder respBuilder =
         Master.PingResponsePB.newBuilder();
     readProtobuf(callResponse.getPBMessage(), respBuilder);
-    PingResponse response = new PingResponse(deadlineTracker.getElapsedMillis(),
-        tsUUID);
+    PingResponse response = new PingResponse(deadlineTracker.getElapsedMillis(), tsUUID);
     return new Pair<>(response, null);
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
index e9effd9..3ad4155 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
@@ -48,16 +48,20 @@ public class RowResultIterator extends KuduRpcResponse implements Iterator<RowRe
 
   /**
    * Package private constructor, only meant to be instantiated from AsyncKuduScanner.
-   * @param ellapsedMillis time in milliseconds since RPC creation to now
+   * @param elapsedMillis time in milliseconds since RPC creation to now
    * @param tsUUID UUID of the tablet server that handled our request
    * @param schema schema used to parse the rows
    * @param numRows how many rows are contained in the bs slice
    * @param bs normal row data
    * @param indirectBs indirect row data
    */
-  private RowResultIterator(long ellapsedMillis, String tsUUID, Schema schema,
-                            int numRows, Slice bs, Slice indirectBs) {
-    super(ellapsedMillis, tsUUID);
+  private RowResultIterator(long elapsedMillis,
+                            String tsUUID,
+                            Schema schema,
+                            int numRows,
+                            Slice bs,
+                            Slice indirectBs) {
+    super(elapsedMillis, tsUUID);
     this.schema = schema;
     this.bs = bs;
     this.indirectBs = indirectBs;
@@ -66,13 +70,14 @@ public class RowResultIterator extends KuduRpcResponse implements Iterator<RowRe
     this.rowResult = numRows == 0 ? null : new RowResult(this.schema, this.bs, this.indirectBs);
   }
 
-  static RowResultIterator makeRowResultIterator(long ellapsedMillis, String tsUUID,
+  static RowResultIterator makeRowResultIterator(long elapsedMillis,
+                                                 String tsUUID,
                                                  Schema schema,
                                                  WireProtocol.RowwiseRowBlockPB data,
                                                  final CallResponse callResponse)
       throws KuduException {
     if (data == null || data.getNumRows() == 0) {
-      return new RowResultIterator(ellapsedMillis, tsUUID, schema, 0, null, null);
+      return new RowResultIterator(elapsedMillis, tsUUID, schema, 0, null, null);
     }
 
     Slice bs = callResponse.getSidecar(data.getRowsSidecar());
@@ -87,7 +92,7 @@ public class RowResultIterator extends KuduRpcResponse implements Iterator<RowRe
           " bytes of data but expected " + expectedSize + " for " + numRows + " rows");
       throw new NonRecoverableException(statusIllegalState);
     }
-    return new RowResultIterator(ellapsedMillis, tsUUID, schema, numRows, bs, indirectBs);
+    return new RowResultIterator(elapsedMillis, tsUUID, schema, numRows, bs, indirectBs);
   }
 
   /**
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 347e3c8..105ba22 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -120,9 +120,6 @@ class RpcProxy {
               .serverInfo(connection.getServerInfo())
               .build());
 
-      if (!rpc.deadlineTracker.hasDeadline()) {
-        LOG.warn("{} sending RPC with no timeout {}", connection.getLogPrefix(), rpc);
-      }
       connection.enqueueMessage(rpcToMessage(client, rpc),
           new Callback<Void, Connection.CallResponseInfo>() {
             @Override
@@ -164,11 +161,10 @@ class RpcProxy {
                 .setServiceName(rpc.serviceName())
                 .setMethodName(rpc.method()));
     final Message reqPB = rpc.createRequestPB();
-
+    // TODO(wdberkeley): We should enforce that every RPC has a timeout.
     if (rpc.deadlineTracker.hasDeadline()) {
       headerBuilder.setTimeoutMillis((int) rpc.deadlineTracker.getMillisBeforeDeadline());
     }
-
     if (rpc.isRequestTracked()) {
       RpcHeader.RequestIdPB.Builder requestIdBuilder = RpcHeader.RequestIdPB.newBuilder();
       final RequestTracker requestTracker = client.getRequestTracker();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index a7c9112..f6c7beb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -153,7 +153,7 @@ public class TestAsyncKuduSession {
       client.deleteTable(TABLE_NAME).join();
       // Wait until tablet is deleted on TS.
       while (true) {
-        ListTabletsRequest req = new ListTabletsRequest();
+        ListTabletsRequest req = new ListTabletsRequest(client.getTimer(), 10000);
         Deferred<ListTabletsResponse> d = req.getDeferred();
         proxy.sendRpc(req);
         ListTabletsResponse resp = d.join();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index ead72b2..64b6ed2 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -117,6 +117,6 @@ public class TestConnectionCache {
     PingRequest ping = PingRequest.makeMasterPingRequest();
     Deferred<PingResponse> d = ping.getDeferred();
     proxy.sendRpc(ping);
-    d.join();
+    d.join(10000);
   }
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index 20033ca..1d5d2c8 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -19,10 +19,12 @@ package org.apache.kudu.client;
 import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
 import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
 import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -54,16 +56,18 @@ public class TestTimeouts {
       }
 
       harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
-      KuduTable table = lowTimeoutsClient.openTable(TABLE_NAME);
 
-      KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
+      // openTable() may time out, nextRows() should time out.
+      try {
+        KuduTable table = lowTimeoutsClient.openTable(TABLE_NAME);
 
-      OperationResponse response = lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
-      assertTrue(response.hasRowError());
-      assertTrue(response.getRowError().getErrorStatus().isTimedOut());
+        KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
 
-      KuduScanner lowTimeoutScanner = lowTimeoutsClient.newScannerBuilder(table).build();
-      try {
+        OperationResponse response = lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
+        assertTrue(response.hasRowError());
+        assertTrue(response.getRowError().getErrorStatus().isTimedOut());
+
+        KuduScanner lowTimeoutScanner = lowTimeoutsClient.newScannerBuilder(table).build();
         lowTimeoutScanner.nextRows();
         fail("Should have timed out");
       } catch (KuduException ex) {
@@ -71,4 +75,52 @@ public class TestTimeouts {
       }
     }
   }
+
+  /**
+   * KUDU-1868: This test checks that, even if there is no event on the channel over which an RPC
+   * was sent (e.g., even if the server hangs and does not respond), RPCs will still time out.
+   */
+  @Test(timeout = 100000)
+  @TabletServerConfig(flags = { "--scanner_inject_latency_on_each_batch_ms=200000" })
+  public void testTimeoutEvenWhenServerHangs() throws Exception {
+    // Set up a table with one row.
+    KuduClient client = harness.getClient();
+    KuduTable table = client.createTable(
+        TABLE_NAME,
+        getBasicSchema(),
+        getBasicCreateTableOptions());
+    assertFalse(client
+        .newSession()
+        .apply(createBasicSchemaInsert(table, 0))
+        .hasRowError());
+
+    // Create a new client with no socket read timeout (0 means do not set a read timeout).
+    try (KuduClient noRecvTimeoutClient =
+             new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+                 .defaultSocketReadTimeoutMs(0)
+                 .build()) {
+      // Propagate the timestamp to be sure we should see the row that was
+      // inserted by another client.
+      noRecvTimeoutClient.updateLastPropagatedTimestamp(client.getLastPropagatedTimestamp());
+      KuduTable noRecvTimeoutTable = noRecvTimeoutClient.openTable(TABLE_NAME);
+
+      // Do something besides a scan to cache table and tablet lookup.
+      noRecvTimeoutClient.getTablesList();
+
+      // Scan with a short timeout.
+      KuduScanner scanner = noRecvTimeoutClient
+          .newScannerBuilder(noRecvTimeoutTable)
+          .scanRequestTimeout(1000)
+          .build();
+
+      // The server will not respond for the lifetime of the test, so we expect
+      // the operation to time out.
+      try {
+        scanner.nextRows();
+        fail("should not have completed nextRows");
+      } catch (NonRecoverableException e) {
+        assertTrue(e.getStatus().isTimedOut());
+      }
+    }
+  }
 }


Mime
View raw message