kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7050; Decrease default consumer request timeout to 30s (#5203)
Date Wed, 13 Jun 2018 23:21:49 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 443091b  KAFKA-7050; Decrease default consumer request timeout to 30s (#5203)
443091b is described below

commit 443091b844d4119637d252a5303568e22d4f1d48
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Jun 13 16:21:30 2018 -0700

    KAFKA-7050; Decrease default consumer request timeout to 30s (#5203)
    
    This patch changes the default `request.timeout.ms` of the consumer to 30 seconds. Additionally, it adds logic to `NetworkClient` and related to components to support timeouts at the request level. We use this to handle the special case of the JoinGroup request, which may block for as long as the value configured by `max.poll.interval.ms`.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <guozhang@confluent.io>
---
 .../org/apache/kafka/clients/ClientRequest.java    |   7 ++
 .../org/apache/kafka/clients/InFlightRequests.java |  25 ++--
 .../java/org/apache/kafka/clients/KafkaClient.java |  11 +-
 .../org/apache/kafka/clients/NetworkClient.java    | 140 +++++++++++++++------
 .../kafka/clients/consumer/ConsumerConfig.java     |   2 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |   9 +-
 .../consumer/internals/AbstractCoordinator.java    |  30 +++--
 .../consumer/internals/ConsumerNetworkClient.java  |  39 +++---
 .../kafka/clients/producer/internals/Sender.java   |  23 ++--
 .../apache/kafka/clients/InFlightRequestsTest.java |  30 ++++-
 .../java/org/apache/kafka/clients/MockClient.java  |  36 +++++-
 .../apache/kafka/clients/NetworkClientTest.java    |  75 ++++++-----
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   4 +-
 .../internals/AbstractCoordinatorTest.java         | 127 +++++++++++++++----
 .../internals/ConsumerCoordinatorTest.java         |  58 +++++----
 .../internals/ConsumerNetworkClientTest.java       |  31 ++++-
 .../clients/consumer/internals/FetcherTest.java    |   6 +-
 .../clients/producer/internals/SenderTest.java     |   4 +-
 core/src/main/scala/kafka/admin/AdminClient.scala  |   2 +-
 .../scala/kafka/common/InterBrokerSendThread.scala |  16 +--
 .../TransactionMarkerChannelManager.scala          |   2 +-
 .../kafka/common/InterBrokerSendThreadTest.scala   |  21 ++--
 docs/upgrade.html                                  |   5 +
 23 files changed, 492 insertions(+), 211 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 9b62946..7b44ca3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -31,6 +31,7 @@ public final class ClientRequest {
     private final String clientId;
     private final long createdTimeMs;
     private final boolean expectResponse;
+    private final int requestTimeoutMs;
     private final RequestCompletionHandler callback;
 
     /**
@@ -48,6 +49,7 @@ public final class ClientRequest {
                          String clientId,
                          long createdTimeMs,
                          boolean expectResponse,
+                         int requestTimeoutMs,
                          RequestCompletionHandler callback) {
         this.destination = destination;
         this.requestBuilder = requestBuilder;
@@ -55,6 +57,7 @@ public final class ClientRequest {
         this.clientId = clientId;
         this.createdTimeMs = createdTimeMs;
         this.expectResponse = expectResponse;
+        this.requestTimeoutMs = requestTimeoutMs;
         this.callback = callback;
     }
 
@@ -101,4 +104,8 @@ public final class ClientRequest {
     public int correlationId() {
         return correlationId;
     }
+
+    public int requestTimeoutMs() {
+        return requestTimeoutMs;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 5caee2d..5b7ba61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -17,11 +17,11 @@
 package org.apache.kafka.clients;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -162,25 +162,28 @@ final class InFlightRequests {
         }
     }
 
+    private Boolean hasExpiredRequest(long now, Deque<NetworkClient.InFlightRequest> deque) {
+        for (NetworkClient.InFlightRequest request : deque) {
+            long timeSinceSend = Math.max(0, now - request.sendTimeMs);
+            if (timeSinceSend > request.requestTimeoutMs)
+                return true;
+        }
+        return false;
+    }
+
     /**
      * Returns a list of nodes with pending in-flight request, that need to be timed out
      *
      * @param now current time in milliseconds
-     * @param requestTimeoutMs max time to wait for the request to be completed
      * @return list of nodes
      */
-    public List<String> getNodesWithTimedOutRequests(long now, int requestTimeoutMs) {
-        List<String> nodeIds = new LinkedList<>();
+    public List<String> nodesWithTimedOutRequests(long now) {
+        List<String> nodeIds = new ArrayList<>();
         for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) {
             String nodeId = requestEntry.getKey();
             Deque<NetworkClient.InFlightRequest> deque = requestEntry.getValue();
-
-            if (!deque.isEmpty()) {
-                NetworkClient.InFlightRequest request = deque.peekLast();
-                long timeSinceSend = now - request.sendTimeMs;
-                if (timeSinceSend > requestTimeoutMs)
-                    nodeIds.add(nodeId);
-            }
+            if (hasExpiredRequest(now, deque))
+                nodeIds.add(nodeId);
         }
         return nodeIds;
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 49bf3a3..448932e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -185,9 +185,16 @@ public interface KafkaClient extends Closeable {
      * @param requestBuilder the request builder to use
      * @param createdTimeMs the time in milliseconds to use as the creation time of the request
      * @param expectResponse true iff we expect a response
+     * @param requestTimeoutMs Upper bound time in milliseconds to await a response before disconnecting the socket and
+     *                         cancelling the request. The request may get cancelled sooner if the socket disconnects
+     *                         for any reason including if another pending request to the same node timed out first.
      * @param callback the callback to invoke when we get a response
      */
-    ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
-                                   boolean expectResponse, RequestCompletionHandler callback);
+    ClientRequest newClientRequest(String nodeId,
+                                   AbstractRequest.Builder<?> requestBuilder,
+                                   long createdTimeMs,
+                                   boolean expectResponse,
+                                   int requestTimeoutMs,
+                                   RequestCompletionHandler callback);
 
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 7c87277..619f7bd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -88,8 +88,8 @@ public class NetworkClient implements KafkaClient {
     /* the current correlation id to use when sending requests to servers */
     private int correlation;
 
-    /* max time in ms for the producer to wait for acknowledgement from server*/
-    private final int requestTimeoutMs;
+    /* default timeout for individual requests to await acknowledgement from servers */
+    private final int defaultRequestTimeoutMs;
 
     /* time in ms to wait before retrying to create connection to a server */
     private final long reconnectBackoffMs;
@@ -117,15 +117,26 @@ public class NetworkClient implements KafkaClient {
                          long reconnectBackoffMax,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
-                         int requestTimeoutMs,
+                         int defaultRequestTimeoutMs,
                          Time time,
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions,
                          LogContext logContext) {
-        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
-             reconnectBackoffMs, reconnectBackoffMax,
-             socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
-             discoverBrokerVersions, apiVersions, null, logContext);
+        this(null,
+             metadata,
+             selector,
+             clientId,
+             maxInFlightRequestsPerConnection,
+             reconnectBackoffMs,
+             reconnectBackoffMax,
+             socketSendBuffer,
+             socketReceiveBuffer,
+             defaultRequestTimeoutMs,
+             time,
+             discoverBrokerVersions,
+             apiVersions,
+             null,
+             logContext);
     }
 
     public NetworkClient(Selectable selector,
@@ -136,16 +147,27 @@ public class NetworkClient implements KafkaClient {
             long reconnectBackoffMax,
             int socketSendBuffer,
             int socketReceiveBuffer,
-            int requestTimeoutMs,
+            int defaultRequestTimeoutMs,
             Time time,
             boolean discoverBrokerVersions,
             ApiVersions apiVersions,
             Sensor throttleTimeSensor,
             LogContext logContext) {
-        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
-             reconnectBackoffMs, reconnectBackoffMax,
-             socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
-             discoverBrokerVersions, apiVersions, throttleTimeSensor, logContext);
+        this(null,
+             metadata,
+             selector,
+             clientId,
+             maxInFlightRequestsPerConnection,
+             reconnectBackoffMs,
+             reconnectBackoffMax,
+             socketSendBuffer,
+             socketReceiveBuffer,
+             defaultRequestTimeoutMs,
+             time,
+             discoverBrokerVersions,
+             apiVersions,
+             throttleTimeSensor,
+             logContext);
     }
 
     public NetworkClient(Selectable selector,
@@ -156,15 +178,26 @@ public class NetworkClient implements KafkaClient {
                          long reconnectBackoffMax,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
-                         int requestTimeoutMs,
+                         int defaultRequestTimeoutMs,
                          Time time,
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions,
                          LogContext logContext) {
-        this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection,
-             reconnectBackoffMs, reconnectBackoffMax,
-             socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
-             discoverBrokerVersions, apiVersions, null, logContext);
+        this(metadataUpdater,
+             null,
+             selector,
+             clientId,
+             maxInFlightRequestsPerConnection,
+             reconnectBackoffMs,
+             reconnectBackoffMax,
+             socketSendBuffer,
+             socketReceiveBuffer,
+             defaultRequestTimeoutMs,
+             time,
+             discoverBrokerVersions,
+             apiVersions,
+             null,
+             logContext);
     }
 
     private NetworkClient(MetadataUpdater metadataUpdater,
@@ -176,7 +209,7 @@ public class NetworkClient implements KafkaClient {
                           long reconnectBackoffMax,
                           int socketSendBuffer,
                           int socketReceiveBuffer,
-                          int requestTimeoutMs,
+                          int defaultRequestTimeoutMs,
                           Time time,
                           boolean discoverBrokerVersions,
                           ApiVersions apiVersions,
@@ -201,7 +234,7 @@ public class NetworkClient implements KafkaClient {
         this.socketReceiveBuffer = socketReceiveBuffer;
         this.correlation = 0;
         this.randOffset = new Random();
-        this.requestTimeoutMs = requestTimeoutMs;
+        this.defaultRequestTimeoutMs = defaultRequestTimeoutMs;
         this.reconnectBackoffMs = reconnectBackoffMs;
         this.time = time;
         this.discoverBrokerVersions = discoverBrokerVersions;
@@ -426,31 +459,28 @@ public class NetworkClient implements KafkaClient {
     }
 
     private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
-        String nodeId = clientRequest.destination();
+        String destination = clientRequest.destination();
         RequestHeader header = clientRequest.makeHeader(request.version());
         if (log.isDebugEnabled()) {
             int latestClientVersion = clientRequest.apiKey().latestVersion();
             if (header.apiVersion() == latestClientVersion) {
                 log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
-                        clientRequest.correlationId(), nodeId);
+                        clientRequest.correlationId(), destination);
             } else {
                 log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
-                        header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), nodeId);
+                        header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
             }
         }
-        Send send = request.toSend(nodeId, header);
+        Send send = request.toSend(destination, header);
         InFlightRequest inFlightRequest = new InFlightRequest(
+                clientRequest,
                 header,
-                clientRequest.createdTimeMs(),
-                clientRequest.destination(),
-                clientRequest.callback(),
-                clientRequest.expectResponse(),
                 isInternalRequest,
                 request,
                 send,
                 now);
         this.inFlightRequests.add(inFlightRequest);
-        selector.send(inFlightRequest.send);
+        selector.send(send);
     }
 
     /**
@@ -475,7 +505,7 @@ public class NetworkClient implements KafkaClient {
 
         long metadataTimeout = metadataUpdater.maybeUpdate(now);
         try {
-            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
+            this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
         } catch (IOException e) {
             log.error("Unexpected error during I/O", e);
         }
@@ -658,7 +688,7 @@ public class NetworkClient implements KafkaClient {
      * @param now The current time
      */
     private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
-        List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
+        List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
         for (String nodeId : nodeIds) {
             // close connection to the node
             this.selector.close(nodeId);
@@ -868,7 +898,7 @@ public class NetworkClient implements KafkaClient {
         public long maybeUpdate(long now) {
             // should we update our metadata?
             long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
-            long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;
+            long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;
 
             long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
 
@@ -965,7 +995,7 @@ public class NetworkClient implements KafkaClient {
 
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                 sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
-                return requestTimeoutMs;
+                return defaultRequestTimeoutMs;
             }
 
             // If there's any connection establishment underway, wait until it completes. This prevents
@@ -993,16 +1023,26 @@ public class NetworkClient implements KafkaClient {
     }
 
     @Override
-    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
+    public ClientRequest newClientRequest(String nodeId,
+                                          AbstractRequest.Builder<?> requestBuilder,
+                                          long createdTimeMs,
                                           boolean expectResponse) {
-        return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, null);
+        return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, defaultRequestTimeoutMs, null);
     }
 
     @Override
-    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
-                                          boolean expectResponse, RequestCompletionHandler callback) {
+    public ClientRequest newClientRequest(String nodeId,
+                                          AbstractRequest.Builder<?> requestBuilder,
+                                          long createdTimeMs,
+                                          boolean expectResponse,
+                                          int requestTimeoutMs,
+                                          RequestCompletionHandler callback) {
         return new ClientRequest(nodeId, requestBuilder, correlation++, clientId, createdTimeMs, expectResponse,
-                callback);
+                defaultRequestTimeoutMs, callback);
+    }
+
+    public boolean discoverBrokerVersions() {
+        return discoverBrokerVersions;
     }
 
     static class InFlightRequest {
@@ -1015,8 +1055,28 @@ public class NetworkClient implements KafkaClient {
         final Send send;
         final long sendTimeMs;
         final long createdTimeMs;
+        final long requestTimeoutMs;
+
+        public InFlightRequest(ClientRequest clientRequest,
+                               RequestHeader header,
+                               boolean isInternalRequest,
+                               AbstractRequest request,
+                               Send send,
+                               long sendTimeMs) {
+            this(header,
+                 clientRequest.requestTimeoutMs(),
+                 clientRequest.createdTimeMs(),
+                 clientRequest.destination(),
+                 clientRequest.callback(),
+                 clientRequest.expectResponse(),
+                 isInternalRequest,
+                 request,
+                 send,
+                 sendTimeMs);
+        }
 
         public InFlightRequest(RequestHeader header,
+                               int requestTimeoutMs,
                                long createdTimeMs,
                                String destination,
                                RequestCompletionHandler callback,
@@ -1026,6 +1086,8 @@ public class NetworkClient implements KafkaClient {
                                Send send,
                                long sendTimeMs) {
             this.header = header;
+            this.requestTimeoutMs = requestTimeoutMs;
+            this.createdTimeMs = createdTimeMs;
             this.destination = destination;
             this.callback = callback;
             this.expectResponse = expectResponse;
@@ -1033,7 +1095,6 @@ public class NetworkClient implements KafkaClient {
             this.request = request;
             this.send = send;
             this.sendTimeMs = sendTimeMs;
-            this.createdTimeMs = createdTimeMs;
         }
 
         public ClientResponse completed(AbstractResponse response, long timeMs) {
@@ -1060,7 +1121,4 @@ public class NetworkClient implements KafkaClient {
         }
     }
 
-    public boolean discoverBrokerVersions() {
-        return discoverBrokerVersions;
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index bc9a716..fefeae3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -403,7 +403,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         VALUE_DESERIALIZER_CLASS_DOC)
                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
                                         Type.INT,
-                                        305000, // chosen to be higher than the default of max.poll.interval.ms
+                                        30000,
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d6973c0..76e0fcc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -668,10 +668,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             log.debug("Initializing the Kafka consumer");
             this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-            int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
-            int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
-            if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
-                throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
+            int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
+            if (this.requestTimeoutMs < sessionTimeoutMs)
+                throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " (" + requestTimeoutMs +
+                                                  ") cannot be lower than  " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG +
+                                                  " (" + sessionTimeoutMs + ")");
             this.time = Time.SYSTEM;
 
             Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index adbaae7..8f68138 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -319,7 +319,11 @@ public abstract class AbstractCoordinator implements Closeable {
      * @return true iff the group is active
      */
     boolean ensureActiveGroup(final long timeoutMs) {
-        final long startTime = time.milliseconds();
+        return ensureActiveGroup(timeoutMs, time.milliseconds());
+    }
+
+    // Visible for testing
+    boolean ensureActiveGroup(long timeoutMs, long startMs) {
         // always ensure that the coordinator is ready because we may have been disconnected
         // when sending heartbeats and does not necessarily require us to rejoin the group.
         if (!ensureCoordinatorReady(timeoutMs)) {
@@ -328,7 +332,9 @@ public abstract class AbstractCoordinator implements Closeable {
 
         startHeartbeatThreadIfNeeded();
 
-        return joinGroupIfNeeded(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startTime));
+        long joinStartMs = time.milliseconds();
+        long joinTimeoutMs = remainingTimeAtLeastZero(timeoutMs, joinStartMs - startMs);
+        return joinGroupIfNeeded(joinTimeoutMs, joinStartMs);
     }
 
     private synchronized void startHeartbeatThreadIfNeeded() {
@@ -366,17 +372,17 @@ public abstract class AbstractCoordinator implements Closeable {
      * Visible for testing.
      *
      * @param timeoutMs Time to complete this action
+     * @param startTimeMs Current time when invoked
      * @return true iff the operation succeeded
      */
-    boolean joinGroupIfNeeded(final long timeoutMs) {
-        final long startTime = time.milliseconds();
+    boolean joinGroupIfNeeded(final long timeoutMs, final long startTimeMs) {
         long elapsedTime = 0L;
 
         while (rejoinNeededOrPending()) {
             if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
                 return false;
             }
-            elapsedTime = time.milliseconds() - startTime;
+            elapsedTime = time.milliseconds() - startTimeMs;
 
             // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
             // time if the client is woken up before a pending rebalance completes. This must be called
@@ -415,7 +421,7 @@ public abstract class AbstractCoordinator implements Closeable {
             }
 
             if (rejoinNeededOrPending()) {
-                elapsedTime = time.milliseconds() - startTime;
+                elapsedTime = time.milliseconds() - startTimeMs;
             }
         }
         return true;
@@ -473,9 +479,12 @@ public abstract class AbstractCoordinator implements Closeable {
      * Join the group and return the assignment for the next generation. This function handles both
      * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
      * elected leader by the coordinator.
+     *
+     * NOTE: This is visible only for testing
+     *
      * @return A request future which wraps the assignment returned from the group leader
      */
-    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
+    RequestFuture<ByteBuffer> sendJoinGroupRequest() {
         if (coordinatorUnknown())
             return RequestFuture.coordinatorNotAvailable();
 
@@ -489,7 +498,12 @@ public abstract class AbstractCoordinator implements Closeable {
                 metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
 
         log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
-        return client.send(coordinator, requestBuilder)
+
+        // Note that we override the request timeout using the rebalance timeout since that is the
+        // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.
+
+        int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
+        return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
                 .compose(new JoinGroupResponseHandler());
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index a9e167a..0bf0aad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -62,7 +62,7 @@ public class ConsumerNetworkClient implements Closeable {
     private final Time time;
     private final long retryBackoffMs;
     private final int maxPollTimeoutMs;
-    private final long unsentExpiryMs;
+    private final int requestTimeoutMs;
     private final AtomicBoolean wakeupDisabled = new AtomicBoolean();
 
     // We do not need high throughput, so use a fair lock to try to avoid starvation
@@ -83,7 +83,7 @@ public class ConsumerNetworkClient implements Closeable {
                                  Metadata metadata,
                                  Time time,
                                  long retryBackoffMs,
-                                 long requestTimeoutMs,
+                                 int requestTimeoutMs,
                                  int maxPollTimeoutMs) {
         this.log = logContext.logger(ConsumerNetworkClient.class);
         this.client = client;
@@ -91,7 +91,15 @@ public class ConsumerNetworkClient implements Closeable {
         this.time = time;
         this.retryBackoffMs = retryBackoffMs;
         this.maxPollTimeoutMs = Math.min(maxPollTimeoutMs, MAX_POLL_TIMEOUT_MS);
-        this.unsentExpiryMs = requestTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+    }
+
+
+    /**
+     * Send a request with the default timeout. See {@link #send(Node, AbstractRequest.Builder, int)}.
+     */
+    public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) {
+        return send(node, requestBuilder, requestTimeoutMs);
     }
 
     /**
@@ -104,13 +112,18 @@ public class ConsumerNetworkClient implements Closeable {
      *
      * @param node The destination of the request
      * @param requestBuilder A builder for the request payload
+     * @param requestTimeoutMs Maximum time in milliseconds to await a response before disconnecting the socket and
+     *                         cancelling the request. The request may be cancelled sooner if the socket disconnects
+     *                         for any reason.
      * @return A future which indicates the result of the send.
      */
-    public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) {
+    public RequestFuture<ClientResponse> send(Node node,
+                                              AbstractRequest.Builder<?> requestBuilder,
+                                              int requestTimeoutMs) {
         long now = time.milliseconds();
         RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
         ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
-                completionHandler);
+                requestTimeoutMs, completionHandler);
         unsent.put(node, clientRequest);
 
         // wakeup the client in case it is blocking in poll so that we can send the queued request
@@ -137,13 +150,6 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     /**
-     * Block until the metadata has been refreshed.
-     */
-    public void awaitMetadataUpdate() {
-        awaitMetadataUpdate(Long.MAX_VALUE);
-    }
-
-    /**
      * Block waiting on the metadata refresh with a timeout.
      *
      * @return true if update succeeded, false otherwise.
@@ -444,10 +450,10 @@ public class ConsumerNetworkClient implements Closeable {
 
     private void failExpiredRequests(long now) {
         // clear all expired unsent requests and fail their corresponding futures
-        Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs);
+        Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now);
         for (ClientRequest request : expiredRequests) {
             RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
-            handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
+            handler.onFailure(new TimeoutException("Failed to send request after " + request.requestTimeoutMs() + " ms."));
         }
     }
 
@@ -655,13 +661,14 @@ public class ConsumerNetworkClient implements Closeable {
             return false;
         }
 
-        public Collection<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) {
+        private Collection<ClientRequest> removeExpiredRequests(long now) {
             List<ClientRequest> expiredRequests = new ArrayList<>();
             for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values()) {
                 Iterator<ClientRequest> requestIterator = requests.iterator();
                 while (requestIterator.hasNext()) {
                     ClientRequest request = requestIterator.next();
-                    if (request.createdTimeMs() < now - unsentExpiryMs) {
+                    long elapsedMs = Math.max(0, now - request.createdTimeMs());
+                    if (elapsedMs > request.requestTimeoutMs()) {
                         expiredRequests.add(request);
                         requestIterator.remove();
                     } else
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7c94179..5cb9046 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -109,7 +109,7 @@ public class Sender implements Runnable {
     private final SenderMetrics sensors;
 
     /* the max time to wait for the server to respond to the request*/
-    private final int requestTimeout;
+    private final int requestTimeoutMs;
 
     /* The max time to wait before retrying a request which has failed */
     private final long retryBackoffMs;
@@ -130,7 +130,7 @@ public class Sender implements Runnable {
                   int retries,
                   SenderMetricsRegistry metricsRegistry,
                   Time time,
-                  int requestTimeout,
+                  int requestTimeoutMs,
                   long retryBackoffMs,
                   TransactionManager transactionManager,
                   ApiVersions apiVersions) {
@@ -145,7 +145,7 @@ public class Sender implements Runnable {
         this.retries = retries;
         this.time = time;
         this.sensors = new SenderMetrics(metricsRegistry);
-        this.requestTimeout = requestTimeout;
+        this.requestTimeoutMs = requestTimeoutMs;
         this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
         this.transactionManager = transactionManager;
@@ -280,7 +280,7 @@ public class Sender implements Runnable {
             }
         }
 
-        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
+        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeoutMs, now);
         // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
         // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
         // we need to reset the producer id here.
@@ -342,12 +342,12 @@ public class Sender implements Runnable {
                         break;
                     }
 
-                    if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeout)) {
+                    if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeoutMs)) {
                         transactionManager.lookupCoordinator(nextRequestHandler);
                         break;
                     }
                 } else {
-                    targetNode = awaitLeastLoadedNodeReady(requestTimeout);
+                    targetNode = awaitLeastLoadedNodeReady(requestTimeoutMs);
                 }
 
                 if (targetNode != null) {
@@ -355,7 +355,7 @@ public class Sender implements Runnable {
                         time.sleep(nextRequestHandler.retryBackoffMs());
 
                     ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
-                            requestBuilder, now, true, nextRequestHandler);
+                            requestBuilder, now, true, requestTimeoutMs, nextRequestHandler);
                     transactionManager.setInFlightTransactionalRequestCorrelationId(clientRequest.correlationId());
                     log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
 
@@ -409,7 +409,7 @@ public class Sender implements Runnable {
     private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException {
         String nodeId = node.idString();
         InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
-        ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null);
+        ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, requestTimeoutMs, null);
         return NetworkClientUtils.sendAndReceive(client, request, time);
     }
 
@@ -424,7 +424,7 @@ public class Sender implements Runnable {
     private void maybeWaitForProducerId() {
         while (!transactionManager.hasProducerId() && !transactionManager.hasError()) {
             try {
-                Node node = awaitLeastLoadedNodeReady(requestTimeout);
+                Node node = awaitLeastLoadedNodeReady(requestTimeoutMs);
                 if (node != null) {
                     ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
                     InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
@@ -652,7 +652,7 @@ public class Sender implements Runnable {
      */
     private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
         for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
-            sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
+            sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
     }
 
     /**
@@ -702,7 +702,8 @@ public class Sender implements Runnable {
         };
 
         String nodeId = Integer.toString(destination);
-        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
+        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
+                requestTimeoutMs, callback);
         client.send(clientRequest, now);
         log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java b/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java
index 600e5dc..c7b9eb9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java
@@ -19,10 +19,14 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -65,6 +69,24 @@ public class InFlightRequestsTest {
     }
 
     @Test
+    public void testTimedOutNodes() {
+        Time time = new MockTime();
+
+        addRequest("A", time.milliseconds(), 50);
+        addRequest("B", time.milliseconds(), 200);
+        addRequest("B", time.milliseconds(), 100);
+
+        time.sleep(50);
+        assertEquals(Collections.emptyList(), inFlightRequests.nodesWithTimedOutRequests(time.milliseconds()));
+
+        time.sleep(25);
+        assertEquals(Collections.singletonList("A"), inFlightRequests.nodesWithTimedOutRequests(time.milliseconds()));
+
+        time.sleep(50);
+        assertEquals(Arrays.asList("A", "B"), inFlightRequests.nodesWithTimedOutRequests(time.milliseconds()));
+    }
+
+    @Test
     public void testCompleteNext() {
         int correlationId1 = addRequest(dest);
         int correlationId2 = addRequest(dest);
@@ -88,12 +110,16 @@ public class InFlightRequestsTest {
     }
 
     private int addRequest(String destination) {
+        return addRequest(destination, 0, 10000);
+    }
+
+    private int addRequest(String destination, long sendTimeMs, int requestTimeoutMs) {
         int correlationId = this.correlationId;
         this.correlationId += 1;
 
         RequestHeader requestHeader = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", correlationId);
-        NetworkClient.InFlightRequest ifr = new NetworkClient.InFlightRequest(requestHeader, 0,
-                destination, null, false, false, null, null, 0);
+        NetworkClient.InFlightRequest ifr = new NetworkClient.InFlightRequest(requestHeader, requestTimeoutMs, 0,
+                destination, null, false, false, null, null, sendTimeMs);
         inFlightRequests.add(ifr);
         return correlationId;
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 2a1e213..e82b0dd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -83,6 +83,8 @@ public class MockClient implements KafkaClient {
     private final TransientSet<Node> blackedOut;
     // Nodes which will always fail to connect, but can be chosen by leastLoadedNode
     private final TransientSet<Node> unreachable;
+    // Nodes which have a delay before ultimately succeeding to connect
+    private final TransientSet<Node> delayedReady;
 
     private final Map<Node, Long> pendingAuthenticationErrors = new HashMap<>();
     private final Map<Node, AuthenticationException> authenticationErrors = new HashMap<>();
@@ -105,6 +107,7 @@ public class MockClient implements KafkaClient {
         this.unavailableTopics = Collections.emptySet();
         this.blackedOut = new TransientSet<>(time);
         this.unreachable = new TransientSet<>(time);
+        this.delayedReady = new TransientSet<>(time);
     }
 
     @Override
@@ -122,6 +125,9 @@ public class MockClient implements KafkaClient {
             return false;
         }
 
+        if (delayedReady.contains(node, now))
+            return false;
+
         ready.add(node.idString());
         return true;
     }
@@ -145,6 +151,10 @@ public class MockClient implements KafkaClient {
         unreachable.add(node, durationMs);
     }
 
+    public void delayReady(Node node, long durationMs) {
+        delayedReady.add(node, durationMs);
+    }
+
     public void authenticationFailed(Node node, long blackoutMs) {
         pendingAuthenticationErrors.remove(node);
         authenticationErrors.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception());
@@ -267,6 +277,7 @@ public class MockClient implements KafkaClient {
     @Override
     public List<ClientResponse> poll(long timeoutMs, long now) {
         maybeAwaitWakeup();
+        checkTimeoutOfPendingRequests(now);
 
         List<ClientResponse> copy = new ArrayList<>(this.responses);
 
@@ -296,6 +307,19 @@ public class MockClient implements KafkaClient {
         return copy;
     }
 
+    private long elapsedTimeMs(long currentTimeMs, long startTimeMs) {
+        return Math.max(0, currentTimeMs - startTimeMs);
+    }
+
+    private void checkTimeoutOfPendingRequests(long nowMs) {
+        ClientRequest request = requests.peek();
+        while (request != null && elapsedTimeMs(nowMs, request.createdTimeMs()) > request.requestTimeoutMs()) {
+            disconnect(request.destination());
+            requests.poll();
+            request = requests.peek();
+        }
+    }
+
     public Queue<ClientRequest> requests() {
         return this.requests;
     }
@@ -493,14 +517,18 @@ public class MockClient implements KafkaClient {
     @Override
     public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
                                           boolean expectResponse) {
-        return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, null);
+        return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, 5000, null);
     }
 
     @Override
-    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
-                                          boolean expectResponse, RequestCompletionHandler callback) {
+    public ClientRequest newClientRequest(String nodeId,
+                                          AbstractRequest.Builder<?> requestBuilder,
+                                          long createdTimeMs,
+                                          boolean expectResponse,
+                                          int requestTimeoutMs,
+                                          RequestCompletionHandler callback) {
         return new ClientRequest(nodeId, requestBuilder, correlation++, "mockClientId", createdTimeMs,
-                expectResponse, callback);
+                expectResponse, requestTimeoutMs, callback);
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index f83226c..e13fcef 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -47,11 +47,12 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class NetworkClientTest {
 
-    protected final int requestTimeoutMs = 1000;
+    protected final int minRequestTimeoutMs = 1000;
     protected final MockTime time = new MockTime();
     protected final MockSelector selector = new MockSelector(time);
     protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
@@ -69,19 +70,19 @@ public class NetworkClientTest {
     private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
-                requestTimeoutMs, time, true, new ApiVersions(), new LogContext());
+                minRequestTimeoutMs, time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
         return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
-                "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, requestTimeoutMs,
+                "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, minRequestTimeoutMs,
                 time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
-                64 * 1024, 64 * 1024, requestTimeoutMs, time, false, new ApiVersions(), new LogContext());
+                64 * 1024, 64 * 1024, minRequestTimeoutMs, time, false, new ApiVersions(), new LogContext());
     }
 
     @Before
@@ -140,10 +141,10 @@ public class NetworkClientTest {
     private void checkSimpleRequestResponse(NetworkClient networkClient) {
         awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
         ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
-                        Collections.<TopicPartition, MemoryRecords>emptyMap());
+                        Collections.emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = networkClient.newClientRequest(
-                node.idString(), builder, time.milliseconds(), true, handler);
+                node.idString(), builder, time.milliseconds(), true, minRequestTimeoutMs, handler);
         networkClient.send(request, time.milliseconds());
         networkClient.poll(1, time.milliseconds());
         assertEquals(1, networkClient.inFlightRequestCount());
@@ -184,16 +185,28 @@ public class NetworkClientTest {
     public void testRequestTimeout() {
         awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
         ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1,
-                1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
+                1000, Collections.emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
-        long now = time.milliseconds();
-        ClientRequest request = client.newClientRequest(
-                node.idString(), builder, now, true, handler);
-        client.send(request, now);
+        int requestTimeoutMs = minRequestTimeoutMs + 5000;
+        ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
+                requestTimeoutMs, handler);
+        testRequestTimeout(request);
+    }
+
+    @Test
+    public void testMinRequestTimeout() {
+        awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
+        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1,
+                1000, Collections.emptyMap());
+        ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
+        testRequestTimeout(request);
+    }
 
-        // sleeping to make sure that the time since last send is greater than requestTimeOut
-        time.sleep(3000);
-        List<ClientResponse> responses = client.poll(3000, time.milliseconds());
+    private void testRequestTimeout(ClientRequest request) {
+        client.send(request, time.milliseconds());
+
+        time.sleep(request.requestTimeoutMs() + 1);
+        List<ClientResponse> responses = client.poll(0, time.milliseconds());
 
         assertEquals(1, responses.size());
         ClientResponse clientResponse = responses.get(0);
@@ -206,9 +219,10 @@ public class NetworkClientTest {
         // Instrument the test to return a response with a 100ms throttle delay.
         awaitReady(client, node);
         ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
-            Collections.<TopicPartition, MemoryRecords>emptyMap());
+            Collections.emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
-        ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, handler);
+        ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
+                minRequestTimeoutMs, handler);
         client.send(request, time.milliseconds());
         client.poll(1, time.milliseconds());
         ResponseHeader respHeader = new ResponseHeader(request.correlationId());
@@ -222,7 +236,7 @@ public class NetworkClientTest {
         resp.writeTo(buffer);
         buffer.flip();
         selector.completeReceive(new NetworkReceive(node.idString(), buffer));
-        List<ClientResponse> responses = client.poll(1, time.milliseconds());
+        client.poll(1, time.milliseconds());
 
         // The connection is not ready due to throttling.
         assertFalse(client.ready(node, time.milliseconds()));
@@ -264,9 +278,10 @@ public class NetworkClientTest {
         selector.clear();
 
         ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
-            Collections.<TopicPartition, MemoryRecords>emptyMap());
+            Collections.emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
-        ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, handler);
+        ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
+                minRequestTimeoutMs, handler);
         client.send(request, time.milliseconds());
         client.poll(1, time.milliseconds());
         ResponseHeader respHeader = new ResponseHeader(request.correlationId());
@@ -280,7 +295,7 @@ public class NetworkClientTest {
         resp.writeTo(buffer);
         buffer.flip();
         selector.completeReceive(new NetworkReceive(node.idString(), buffer));
-        List<ClientResponse> responses = client.poll(1, time.milliseconds());
+        client.poll(1, time.milliseconds());
 
         // Since client-side throttling is disabled, the connection is ready even though the response indicated a
         // throttle delay.
@@ -308,7 +323,7 @@ public class NetworkClientTest {
         client.poll(1, time.milliseconds());
         assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
         leastNode = client.leastLoadedNode(time.milliseconds());
-        assertEquals("There should be NO leastloadednode", leastNode, null);
+        assertNull("There should be NO leastloadednode", leastNode);
     }
 
     @Test
@@ -334,7 +349,7 @@ public class NetworkClientTest {
         awaitReady(clientWithNoExponentialBackoff, node);
 
         selector.serverDisconnect(node.idString());
-        clientWithNoExponentialBackoff.poll(requestTimeoutMs, time.milliseconds());
+        clientWithNoExponentialBackoff.poll(minRequestTimeoutMs, time.milliseconds());
         long delay = clientWithNoExponentialBackoff.connectionDelay(node, time.milliseconds());
 
         assertEquals(reconnectBackoffMsTest, delay);
@@ -346,7 +361,7 @@ public class NetworkClientTest {
         // Start connecting and disconnect before the connection is established
         client.ready(node, time.milliseconds());
         selector.serverDisconnect(node.idString());
-        client.poll(requestTimeoutMs, time.milliseconds());
+        client.poll(minRequestTimeoutMs, time.milliseconds());
 
         // Second attempt should have the same behaviour as exponential backoff is disabled
         assertEquals(reconnectBackoffMsTest, delay);
@@ -376,7 +391,7 @@ public class NetworkClientTest {
 
         // First disconnection
         selector.serverDisconnect(node.idString());
-        client.poll(requestTimeoutMs, time.milliseconds());
+        client.poll(minRequestTimeoutMs, time.milliseconds());
         long delay = client.connectionDelay(node, time.milliseconds());
         long expectedDelay = reconnectBackoffMsTest;
         double jitter = 0.3;
@@ -389,7 +404,7 @@ public class NetworkClientTest {
         // Start connecting and disconnect before the connection is established
         client.ready(node, time.milliseconds());
         selector.serverDisconnect(node.idString());
-        client.poll(requestTimeoutMs, time.milliseconds());
+        client.poll(minRequestTimeoutMs, time.milliseconds());
 
         // Second attempt should take twice as long with twice the jitter
         expectedDelay = Math.round(delay * 2);
@@ -408,13 +423,13 @@ public class NetworkClientTest {
         long now = time.milliseconds();
         ClientRequest request = client.newClientRequest(node.idString(), builder, now, true);
         client.send(request, now);
-        client.poll(requestTimeoutMs, now);
+        client.poll(minRequestTimeoutMs, now);
         assertEquals(1, client.inFlightRequestCount(node.idString()));
         assertTrue(client.hasInFlightRequests(node.idString()));
         assertTrue(client.hasInFlightRequests());
 
         selector.close(node.idString());
-        List<ClientResponse> responses = client.poll(requestTimeoutMs, time.milliseconds());
+        List<ClientResponse> responses = client.poll(minRequestTimeoutMs, time.milliseconds());
         assertEquals(1, responses.size());
         assertTrue(responses.iterator().next().wasDisconnected());
     }
@@ -442,7 +457,7 @@ public class NetworkClientTest {
     }
 
     @Test
-    public void testDisconnectWithMultipleInFlights() throws Exception {
+    public void testDisconnectWithMultipleInFlights() {
         NetworkClient client = this.clientWithNoVersionDiscovery;
         awaitReady(client, node);
         assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(),
@@ -459,11 +474,11 @@ public class NetworkClientTest {
             }
         };
 
-        ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, callback);
+        ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, minRequestTimeoutMs, callback);
         client.send(request1, now);
         client.poll(0, now);
 
-        ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true, callback);
+        ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true, minRequestTimeoutMs, callback);
         client.send(request2, now);
         client.poll(0, now);
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b8681e8..97ec082 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1747,7 +1747,7 @@ public class KafkaConsumerTest {
         String groupId = "mock-group";
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
-        long requestTimeoutMs = 30000;
+        int requestTimeoutMs = 30000;
         int defaultApiTimeoutMs = 30000;
         boolean excludeInternalTopics = true;
         int minBytes = 1;
@@ -1762,7 +1762,7 @@ public class KafkaConsumerTest {
         Deserializer<String> valueDeserializer = new StringDeserializer();
 
         List<PartitionAssignor> assignors = singletonList(assignor);
-        ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.<ConsumerInterceptor<String, String>>emptyList());
+        ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.emptyList());
 
         Metrics metrics = new Metrics();
         ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 32aae44..f88e725 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -45,11 +45,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -59,9 +64,9 @@ public class AbstractCoordinatorTest {
     private static final int REBALANCE_TIMEOUT_MS = 60000;
     private static final int SESSION_TIMEOUT_MS = 10000;
     private static final int HEARTBEAT_INTERVAL_MS = 3000;
-    private static final long RETRY_BACKOFF_MS = 20;
-    private static final long LONG_RETRY_BACKOFF_MS = 10000;
-    private static final long REQUEST_TIMEOUT_MS = 40000;
+    private static final int RETRY_BACKOFF_MS = 100;
+    private static final int LONG_RETRY_BACKOFF_MS = 10000;
+    private static final int REQUEST_TIMEOUT_MS = 40000;
     private static final String GROUP_ID = "dummy-group";
     private static final String METRIC_GROUP_PREFIX = "consumer";
 
@@ -72,27 +77,35 @@ public class AbstractCoordinatorTest {
     private ConsumerNetworkClient consumerClient;
     private DummyCoordinator coordinator;
 
-    private void setupCoordinator(long retryBackoffMs) {
+    private void setupCoordinator() {
+        setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS);
+    }
+
+    private void setupCoordinator(int retryBackoffMs) {
+        setupCoordinator(retryBackoffMs, REBALANCE_TIMEOUT_MS);
+    }
+
+    private void setupCoordinator(int retryBackoffMs, int rebalanceTimeoutMs) {
         this.mockTime = new MockTime();
         this.mockClient = new MockClient(mockTime);
 
-        Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true);
+        Metadata metadata = new Metadata(retryBackoffMs, 60 * 60 * 1000L, true);
         this.consumerClient = new ConsumerNetworkClient(new LogContext(), mockClient, metadata, mockTime,
                 retryBackoffMs, REQUEST_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS);
         Metrics metrics = new Metrics();
 
         Cluster cluster = TestUtils.singletonCluster("topic", 1);
-        metadata.update(cluster, Collections.<String>emptySet(), mockTime.milliseconds());
+        metadata.update(cluster, Collections.emptySet(), mockTime.milliseconds());
         this.node = cluster.nodes().get(0);
         mockClient.setNode(node);
 
         this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
-        this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime);
+        this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs);
     }
 
     @Test
     public void testCoordinatorDiscoveryBackoff() {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@@ -109,8 +122,68 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
+    public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception {
+        setupCoordinator();
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(0);
+
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+        try {
+            long firstAttemptStartMs = mockTime.milliseconds();
+            Future<Boolean> firstAttempt = executor.submit(() ->
+                    coordinator.joinGroupIfNeeded(REQUEST_TIMEOUT_MS, firstAttemptStartMs));
+
+            mockTime.sleep(REQUEST_TIMEOUT_MS);
+            assertFalse(firstAttempt.get());
+            assertTrue(consumerClient.hasPendingRequests(coordinatorNode));
+
+            mockClient.respond(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+            mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+
+            long secondAttemptMs = mockTime.milliseconds();
+            Future<Boolean> secondAttempt = executor.submit(() ->
+                    coordinator.joinGroupIfNeeded(REQUEST_TIMEOUT_MS, secondAttemptMs));
+
+            assertTrue(secondAttempt.get());
+        } finally {
+            executor.shutdownNow();
+            executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Test
+    public void testJoinGroupRequestTimeout() {
+        setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS);
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(0);
+
+        RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
+
+        mockTime.sleep(REQUEST_TIMEOUT_MS + 1);
+        assertFalse(consumerClient.poll(future, 0));
+
+        mockTime.sleep(REBALANCE_TIMEOUT_MS - REQUEST_TIMEOUT_MS + 5000);
+        assertTrue(consumerClient.poll(future, 0));
+    }
+
+    @Test
+    public void testJoinGroupRequestMaxTimeout() {
+        // Ensure we can handle the maximum allowed rebalance timeout
+
+        setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE);
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(0);
+
+        RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
+        assertFalse(consumerClient.poll(future, 0));
+
+        mockTime.sleep(Integer.MAX_VALUE + 1L);
+        assertTrue(consumerClient.poll(future, 0));
+    }
+
+    @Test
     public void testUncaughtExceptionInHeartbeatThread() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
@@ -170,8 +243,8 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
-    public void testLookupCoordinator() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+    public void testLookupCoordinator() {
+        setupCoordinator();
 
         mockClient.setNode(null);
         RequestFuture<Void> noBrokersAvailableFuture = coordinator.lookupCoordinator();
@@ -180,16 +253,16 @@ public class AbstractCoordinatorTest {
         mockClient.setNode(node);
         RequestFuture<Void> future = coordinator.lookupCoordinator();
         assertFalse("Request not sent", future.isDone());
-        assertTrue("New request sent while one is in progress", future == coordinator.lookupCoordinator());
+        assertSame("New request sent while one is in progress", future, coordinator.lookupCoordinator());
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
-        assertTrue("New request not sent after previous completed", future != coordinator.lookupCoordinator());
+        assertNotSame("New request not sent after previous completed", future, coordinator.lookupCoordinator());
     }
 
     @Test
     public void testWakeupAfterJoinGroupSent() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
@@ -227,7 +300,7 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterJoinGroupSentExternalCompletion() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
@@ -267,7 +340,7 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterJoinGroupReceived() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
@@ -303,7 +376,7 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
@@ -341,7 +414,7 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterSyncGroupSent() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
@@ -379,7 +452,7 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
@@ -419,7 +492,7 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterSyncGroupReceived() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
@@ -455,7 +528,7 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
@@ -491,7 +564,7 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupInOnJoinComplete() throws Exception {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         coordinator.wakeupOnJoinComplete = true;
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@@ -522,7 +595,7 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testAuthenticationErrorInEnsureCoordinatorReady() {
-        setupCoordinator(RETRY_BACKOFF_MS);
+        setupCoordinator();
 
         mockClient.createPendingAuthenticationError(node, 300);
 
@@ -583,9 +656,11 @@ public class AbstractCoordinatorTest {
 
         public DummyCoordinator(ConsumerNetworkClient client,
                                 Metrics metrics,
-                                Time time) {
-            super(new LogContext(), client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS,
-                    HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS, false);
+                                Time time,
+                                int rebalanceTimeoutMs,
+                                int retryBackoffMs) {
+            super(new LogContext(), client, GROUP_ID, rebalanceTimeoutMs, SESSION_TIMEOUT_MS,
+                    HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, false);
         }
 
         @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 32da34a..bd0038d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -100,6 +100,7 @@ public class ConsumerCoordinatorTest {
     private int heartbeatIntervalMs = 5000;
     private long retryBackoffMs = 100;
     private int autoCommitIntervalMs = 2000;
+    private int requestTimeoutMs = 30000;
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
     private List<PartitionAssignor> assignors = Collections.<PartitionAssignor>singletonList(partitionAssignor);
     private MockTime time;
@@ -126,7 +127,8 @@ public class ConsumerCoordinatorTest {
         this.metadata = new Metadata(0, Long.MAX_VALUE, true);
         this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         this.client = new MockClient(time, metadata);
-        this.consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, 1000, Integer.MAX_VALUE);
+        this.consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100,
+                requestTimeoutMs, Integer.MAX_VALUE);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
         this.mockOffsetCommitCallback = new MockCommitCallback();
@@ -566,7 +568,7 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -603,9 +605,9 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(cluster, Collections.emptySet());
 
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(2, subscriptions.assignedPartitions().size());
@@ -671,8 +673,8 @@ public class ConsumerCoordinatorTest {
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_SERVER_ERROR));
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_SERVER_ERROR));
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
     }
 
     @Test
@@ -698,7 +700,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -721,7 +723,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -750,7 +752,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -937,7 +939,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener);
         client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
 
         assertEquals(2, rebalanceListener.revokedCount);
         assertEquals(singleton(t1p), rebalanceListener.revoked);
@@ -957,7 +959,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -975,7 +977,7 @@ public class ConsumerCoordinatorTest {
 
         // coordinator doesn't like the session timeout
         client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT));
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
     }
 
     @Test
@@ -1132,7 +1134,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
 
         subscriptions.seek(t1p, 100);
 
@@ -1630,14 +1632,14 @@ public class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
         makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
+        closeVerifyTimeout(coordinator, 1000, 1000, 1000);
     }
 
     @Test
     public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
         makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
-        closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
+        closeVerifyTimeout(coordinator, 1000, 0, 0);
     }
 
     @Test
@@ -1645,14 +1647,14 @@ public class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
+        closeVerifyTimeout(coordinator, 1000, 1000, 1000);
     }
 
     @Test
     public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
         makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
-        closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
+        closeVerifyTimeout(coordinator, 1000, 0, 0);
     }
 
     @Test
@@ -1660,7 +1662,7 @@ public class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
+        closeVerifyTimeout(coordinator, 1000, 1000, 1000);
     }
 
     @Test
@@ -1668,27 +1670,27 @@ public class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
+        closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
     }
 
     @Test
     public void testCloseNoResponseForCommit() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
+        closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
     }
 
     @Test
     public void testCloseNoResponseForLeaveGroup() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
-        closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
+        closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
     }
 
     @Test
     public void testCloseNoWait() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, 0, 60000, 0, 0);
+        closeVerifyTimeout(coordinator, 0, 0, 0);
     }
 
     @Test
@@ -1698,7 +1700,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureActiveGroup();
         time.sleep(heartbeatIntervalMs + 100);
         Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
-        closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
+        closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
         Thread[] threads = new Thread[Thread.activeCount()];
         int threadCount = Thread.enumerate(threads);
         for (int i = 0; i < threadCount; i++)
@@ -1736,7 +1738,7 @@ public class ConsumerCoordinatorTest {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
             client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-            coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+            coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
         } else
             subscriptions.assignFromUser(singleton(t1p));
 
@@ -1754,9 +1756,11 @@ public class ConsumerCoordinatorTest {
         consumerClient.poll(0);
         assertTrue(coordinator.coordinatorUnknown());
     }
+
     private void closeVerifyTimeout(final ConsumerCoordinator coordinator,
-            final long closeTimeoutMs, final long requestTimeoutMs,
-            long expectedMinTimeMs, long expectedMaxTimeMs) throws Exception {
+                                    final long closeTimeoutMs,
+                                    final long expectedMinTimeMs,
+                                    final long expectedMaxTimeMs) throws Exception {
         ExecutorService executor = Executors.newSingleThreadExecutor();
         try {
             boolean coordinatorUnknown = coordinator.coordinatorUnknown();
@@ -1903,7 +1907,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(assignment, Errors.NONE));
-        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds());
     }
 
     private void prepareOffsetCommitRequest(Map<TopicPartition, Long> expectedOffsets, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index d0888fa..d5ec382 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.HeartbeatRequest;
@@ -126,6 +127,26 @@ public class ConsumerNetworkClientTest {
     }
 
     @Test
+    public void testTimeoutUnsentRequest() {
+        // Delay connection to the node so that the request remains unsent
+        client.delayReady(node, 1000);
+
+        RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat(), 500);
+        consumerClient.pollNoWakeup();
+
+        // Ensure the request is pending, but hasn't been sent
+        assertTrue(consumerClient.hasPendingRequests());
+        assertFalse(client.hasInFlightRequests());
+
+        time.sleep(501);
+        consumerClient.pollNoWakeup();
+
+        assertFalse(consumerClient.hasPendingRequests());
+        assertTrue(future.failed());
+        assertTrue(future.exception() instanceof TimeoutException);
+    }
+
+    @Test
     public void doNotBlockIfPollConditionIsSatisfied() {
         NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(),
@@ -175,7 +196,7 @@ public class ConsumerNetworkClientTest {
 
         NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(),
-                mockNetworkClient, metadata, time, retryBackoffMs, 1000L, Integer.MAX_VALUE);
+                mockNetworkClient, metadata, time, retryBackoffMs, 1000, Integer.MAX_VALUE);
 
         EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0);
         EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
@@ -273,8 +294,8 @@ public class ConsumerNetworkClientTest {
     }
 
     @Test
-    public void sendExpiry() throws InterruptedException {
-        long unsentExpiryMs = 10;
+    public void sendExpiry() {
+        int requestTimeoutMs = 10;
         final AtomicBoolean isReady = new AtomicBoolean();
         final AtomicBoolean disconnected = new AtomicBoolean();
         client = new MockClient(time) {
@@ -291,13 +312,13 @@ public class ConsumerNetworkClientTest {
             }
         };
         // Queue first send, sleep long enough for this to expire and then queue second send
-        consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, unsentExpiryMs, Integer.MAX_VALUE);
+        consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, requestTimeoutMs, Integer.MAX_VALUE);
         RequestFuture<ClientResponse> future1 = consumerClient.send(node, heartbeat());
         assertEquals(1, consumerClient.pendingRequestCount());
         assertEquals(1, consumerClient.pendingRequestCount(node));
         assertFalse(future1.isDone());
 
-        time.sleep(unsentExpiryMs + 1);
+        time.sleep(requestTimeoutMs + 1);
         RequestFuture<ClientResponse> future2 = consumerClient.send(node, heartbeat());
         assertEquals(2, consumerClient.pendingRequestCount());
         assertEquals(2, consumerClient.pendingRequestCount(node));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 46666ca..4169550 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1392,7 +1392,7 @@ public class FetcherTest {
      * Send multiple requests. Verify that the client side quota metrics have the right values
      */
     @Test
-    public void testQuotaMetrics() throws Exception {
+    public void testQuotaMetrics() {
         MockSelector selector = new MockSelector(time);
         Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
         Cluster cluster = TestUtils.singletonCluster("test", 1);
@@ -1413,8 +1413,8 @@ public class FetcherTest {
 
         for (int i = 1; i <= 3; i++) {
             int throttleTimeMs = 100 * i;
-            FetchRequest.Builder builder = FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap<TopicPartition, PartitionData>());
-            ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null);
+            FetchRequest.Builder builder = FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap<>());
+            ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
             client.send(request, time.milliseconds());
             client.poll(1, time.milliseconds());
             FetchResponse response = fullFetchResponse(tp0, nextRecords, Errors.NONE, i, throttleTimeMs);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 77005b7..d87c8f9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -263,8 +263,8 @@ public class SenderTest {
         for (int i = 1; i <= 3; i++) {
             int throttleTimeMs = 100 * i;
             ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
-                            Collections.<TopicPartition, MemoryRecords>emptyMap());
-            ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null);
+                            Collections.emptyMap());
+            ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
             client.send(request, time.milliseconds());
             client.poll(1, time.milliseconds());
             ProduceResponse response = produceResponse(tp0, i, Errors.NONE, throttleTimeMs);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 7312f10..d847881 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -460,7 +460,7 @@ object AdminClient {
       metadata,
       time,
       retryBackoffMs,
-      requestTimeoutMs.toLong,
+      requestTimeoutMs,
       Integer.MAX_VALUE)
 
     new AdminClient(
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 60635f1..aedaac7 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -39,7 +39,7 @@ abstract class InterBrokerSendThread(name: String,
   extends ShutdownableThread(name, isInterruptible) {
 
   def generateRequests(): Iterable[RequestAndCompletionHandler]
-  def unsentExpiryMs: Int
+  def requestTimeoutMs: Int
   private val unsentRequests = new UnsentRequests
 
   def hasUnsentRequests = unsentRequests.iterator().hasNext
@@ -57,7 +57,8 @@ abstract class InterBrokerSendThread(name: String,
     generateRequests().foreach { request =>
       val completionHandler = request.handler
       unsentRequests.put(request.destination,
-        networkClient.newClientRequest(request.destination.idString, request.request, now, true, completionHandler))
+        networkClient.newClientRequest(request.destination.idString, request.request, now, true,
+          requestTimeoutMs, completionHandler))
     }
 
     try {
@@ -118,9 +119,9 @@ abstract class InterBrokerSendThread(name: String,
 
   private def failExpiredRequests(now: Long): Unit = {
     // clear all expired unsent requests
-    val expiredRequests = unsentRequests.removeExpiredRequests(now, unsentExpiryMs)
-    for (request <- expiredRequests.asScala) {
-      debug(s"Failed to send the following request after $unsentExpiryMs ms: $request")
+    val timedOutRequests = unsentRequests.removeAllTimedOut(now)
+    for (request <- timedOutRequests.asScala) {
+      debug(s"Failed to send the following request after ${request.requestTimeoutMs} ms: $request")
       completeWithDisconnect(request, now, null)
     }
   }
@@ -152,14 +153,15 @@ private class UnsentRequests {
     requests.add(request)
   }
 
-  def removeExpiredRequests(now: Long, unsentExpiryMs: Long): Collection[ClientRequest] = {
+  def removeAllTimedOut(now: Long): Collection[ClientRequest] = {
     val expiredRequests = new ArrayList[ClientRequest]
     for (requests <- unsent.values.asScala) {
       val requestIterator = requests.iterator
       var foundExpiredRequest = false
       while (requestIterator.hasNext && !foundExpiredRequest) {
         val request = requestIterator.next
-        if (request.createdTimeMs < now - unsentExpiryMs) {
+        val elapsedMs = Math.max(0, now - request.createdTimeMs)
+        if (elapsedMs > request.requestTimeoutMs) {
           expiredRequests.add(request)
           requestIterator.remove()
           foundExpiredRequest = true
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 7059ced..f8b56e8 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -135,7 +135,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
   private val txnLogAppendRetryQueue = new LinkedBlockingQueue[TxnLogAppend]()
 
-  override val unsentExpiryMs: Int = config.requestTimeoutMs
+  override val requestTimeoutMs: Int = config.requestTimeoutMs
 
   newGauge(
     "UnknownDestinationQueueSize",
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index 7106866..6838653 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -32,11 +32,12 @@ class InterBrokerSendThreadTest {
   private val time = new MockTime()
   private val networkClient = EasyMock.createMock(classOf[NetworkClient])
   private val completionHandler = new StubCompletionHandler
+  private val requestTimeoutMs = 1000
 
   @Test
   def shouldNotSendAnythingWhenNoRequests(): Unit = {
     val sendThread = new InterBrokerSendThread("name", networkClient, time) {
-      override val unsentExpiryMs: Int = 1000
+      override val requestTimeoutMs: Int = InterBrokerSendThreadTest.this.requestTimeoutMs
       override def generateRequests() = mutable.Iterable.empty
     }
 
@@ -58,16 +59,18 @@ class InterBrokerSendThreadTest {
     val node = new Node(1, "", 8080)
     val handler = RequestAndCompletionHandler(node, request, completionHandler)
     val sendThread = new InterBrokerSendThread("name", networkClient, time) {
-      override val unsentExpiryMs: Int = 1000
+      override val requestTimeoutMs: Int = InterBrokerSendThreadTest.this.requestTimeoutMs
       override def generateRequests() = List[RequestAndCompletionHandler](handler)
     }
 
-    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, handler.handler)
+    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true,
+      requestTimeoutMs, handler.handler)
 
     EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
       EasyMock.same(handler.request),
       EasyMock.anyLong(),
       EasyMock.eq(true),
+      EasyMock.eq(requestTimeoutMs),
       EasyMock.same(handler.handler)))
       .andReturn(clientRequest)
 
@@ -93,16 +96,18 @@ class InterBrokerSendThreadTest {
     val node = new Node(1, "", 8080)
     val requestAndCompletionHandler = RequestAndCompletionHandler(node, request, completionHandler)
     val sendThread = new InterBrokerSendThread("name", networkClient, time) {
-      override val unsentExpiryMs: Int = 1000
+      override val requestTimeoutMs: Int = InterBrokerSendThreadTest.this.requestTimeoutMs
       override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler)
     }
 
-    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestAndCompletionHandler.handler)
+    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true,
+      requestTimeoutMs, requestAndCompletionHandler.handler)
 
     EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
       EasyMock.same(requestAndCompletionHandler.request),
       EasyMock.anyLong(),
       EasyMock.eq(true),
+      EasyMock.eq(requestTimeoutMs),
       EasyMock.same(requestAndCompletionHandler.handler)))
       .andReturn(clientRequest)
 
@@ -135,17 +140,19 @@ class InterBrokerSendThreadTest {
     val node = new Node(1, "", 8080)
     val handler = RequestAndCompletionHandler(node, request, completionHandler)
     val sendThread = new InterBrokerSendThread("name", networkClient, time) {
-      override val unsentExpiryMs: Int = 1000
+      override val requestTimeoutMs: Int = InterBrokerSendThreadTest.this.requestTimeoutMs
       override def generateRequests() = List[RequestAndCompletionHandler](handler)
     }
 
-    val clientRequest = new ClientRequest("dest", request, 0, "1", time.milliseconds(), true, handler.handler)
+    val clientRequest = new ClientRequest("dest", request, 0, "1", time.milliseconds(), true,
+      requestTimeoutMs, handler.handler)
     time.sleep(1500)
 
     EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
       EasyMock.same(handler.request),
       EasyMock.eq(time.milliseconds()),
       EasyMock.eq(true),
+      EasyMock.eq(requestTimeoutMs),
       EasyMock.same(handler.handler)))
       .andReturn(clientRequest)
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 7061d6c..6119536 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -105,6 +105,11 @@
         The old <code>poll(long)</code> API has been deprecated and will be removed in a future version. Overloads have also been added
         for other <code>KafkaConsumer</code> methods like <code>partitionsFor</code>, <code>listTopics</code>, <code>offsetsForTimes</code>,
         <code>beginningOffsets</code>, <code>endOffsets</code> and <code>close</code> that take in a <code>Duration</code>.</li>
+    <li>Also as part of KIP-266, the default value of <code>request.timeout.ms</code> has been changed to 30 seconds.
+        The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take.
+        Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from 
+        <code>max.poll.interval.ms</code> for the request timeout. All other request types use the timeout defined
+        by <code>request.timeout.ms</code></li> 
     <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
     <li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
     <li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li>

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message