kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [5/5] kafka git commit: KAFKA-4507; Clients should support older brokers (KIP-97)
Date Wed, 11 Jan 2017 19:31:05 GMT
KAFKA-4507; Clients should support older brokers (KIP-97)

The client should send older versions of requests to the broker if necessary.

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2264 from cmccabe/KAFKA-4507


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3d60f1e6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3d60f1e6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3d60f1e6

Branch: refs/heads/trunk
Commit: 3d60f1e64350710a5fd2a6036da353cc83195741
Parents: d173395
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Wed Jan 11 19:18:49 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Jan 11 19:25:58 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientRequest.java |  45 ++--
 .../apache/kafka/clients/ClientResponse.java    |  11 +-
 .../org/apache/kafka/clients/ClientUtils.java   |  14 -
 .../apache/kafka/clients/ConnectionState.java   |   2 +-
 .../org/apache/kafka/clients/KafkaClient.java   |  54 ++--
 .../org/apache/kafka/clients/NetworkClient.java | 266 +++++++++---------
 .../apache/kafka/clients/NodeApiVersions.java   | 109 ++++++++
 .../kafka/clients/consumer/KafkaConsumer.java   |   7 +-
 .../clients/consumer/OffsetAndTimestamp.java    |   5 +-
 .../consumer/internals/AbstractCoordinator.java |  47 ++--
 .../consumer/internals/ConsumerCoordinator.java |  19 +-
 .../internals/ConsumerNetworkClient.java        |  25 +-
 .../clients/consumer/internals/Fetcher.java     | 142 +++++++---
 .../kafka/clients/producer/KafkaProducer.java   |   5 +-
 .../clients/producer/internals/Sender.java      |  16 +-
 .../common/errors/ObsoleteBrokerException.java  |  31 +++
 .../apache/kafka/common/protocol/ApiKeys.java   |   6 +-
 .../kafka/common/protocol/ProtoUtils.java       |   4 -
 .../apache/kafka/common/protocol/Protocol.java  |   4 +-
 .../kafka/common/requests/AbstractRequest.java  |  43 ++-
 .../common/requests/ApiVersionsRequest.java     |  38 ++-
 .../common/requests/ApiVersionsResponse.java    |   3 +-
 .../requests/ControlledShutdownRequest.java     |  41 ++-
 .../common/requests/CreateTopicsRequest.java    |  58 +++-
 .../common/requests/DeleteTopicsRequest.java    |  48 +++-
 .../common/requests/DescribeGroupsRequest.java  |  41 ++-
 .../kafka/common/requests/FetchRequest.java     | 106 +++++---
 .../requests/GroupCoordinatorRequest.java       |  44 ++-
 .../kafka/common/requests/HeartbeatRequest.java |  48 +++-
 .../kafka/common/requests/JoinGroupRequest.java | 101 ++++---
 .../common/requests/LeaderAndIsrRequest.java    |  59 +++-
 .../common/requests/LeaveGroupRequest.java      |  46 +++-
 .../common/requests/ListGroupsRequest.java      |  35 ++-
 .../common/requests/ListOffsetRequest.java      | 143 +++++++---
 .../common/requests/ListOffsetResponse.java     |  18 ++
 .../kafka/common/requests/MetadataRequest.java  |  76 +++++-
 .../common/requests/OffsetCommitRequest.java    | 103 +++++--
 .../common/requests/OffsetFetchRequest.java     |  52 ++--
 .../kafka/common/requests/ProduceRequest.java   |  53 +++-
 .../kafka/common/requests/RequestHeader.java    |   5 -
 .../common/requests/SaslHandshakeRequest.java   |  14 +-
 .../common/requests/StopReplicaRequest.java     |  56 +++-
 .../kafka/common/requests/SyncGroupRequest.java |  60 ++++-
 .../common/requests/UpdateMetadataRequest.java  | 110 +++++---
 .../authenticator/SaslClientAuthenticator.java  |   3 +-
 .../org/apache/kafka/common/utils/Utils.java    |  22 ++
 .../org/apache/kafka/clients/MockClient.java    |  32 +--
 .../NetworkClientApiVersionsCheckTest.java      |  89 ------
 .../apache/kafka/clients/NetworkClientTest.java |  93 ++++---
 .../kafka/clients/NodeApiVersionsTest.java      | 101 +++++++
 .../clients/consumer/KafkaConsumerTest.java     |   6 +-
 .../internals/ConsumerNetworkClientTest.java    |  19 +-
 .../clients/consumer/internals/FetcherTest.java |  11 +-
 .../clients/producer/internals/SenderTest.java  |   2 +-
 .../common/requests/RequestResponseTest.java    | 269 +++++++++----------
 .../authenticator/SaslAuthenticatorTest.java    |  30 ++-
 .../runtime/distributed/WorkerGroupMember.java  |   5 +-
 .../main/scala/kafka/admin/AdminClient.scala    |  20 +-
 .../controller/ControllerChannelManager.scala   |  71 ++---
 .../kafka/controller/KafkaController.scala      |   5 +-
 .../scala/kafka/network/RequestChannel.scala    |   6 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   2 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  10 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  41 +--
 .../kafka/utils/NetworkClientBlockingOps.scala  |   5 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  44 +--
 .../unit/kafka/network/SocketServerTest.scala   |  10 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |  13 +-
 .../unit/kafka/server/ApiVersionsTest.scala     |   8 +-
 .../unit/kafka/server/BaseRequestTest.scala     |  10 +-
 .../kafka/server/CreateTopicsRequestTest.scala  |  68 ++---
 .../kafka/server/DeleteTopicsRequestTest.scala  |  29 +-
 .../unit/kafka/server/EdgeCaseRequestTest.scala |   3 +-
 .../unit/kafka/server/FetchRequestTest.scala    |  13 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   6 +-
 .../unit/kafka/server/MetadataCacheTest.scala   |  18 +-
 .../unit/kafka/server/MetadataRequestTest.scala |  32 +--
 .../unit/kafka/server/ProduceRequestTest.scala  |   6 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  12 +-
 .../server/SaslApiVersionsRequestTest.scala     |  23 +-
 .../processor/internals/StreamsKafkaClient.java |  28 +-
 81 files changed, 2184 insertions(+), 1194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index de6e506..08b8d46 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 
@@ -21,41 +22,47 @@ import org.apache.kafka.common.requests.RequestHeader;
 public final class ClientRequest {
 
     private final String destination;
-    private final RequestHeader header;
-    private final AbstractRequest body;
+    private final AbstractRequest.Builder<?> requestBuilder;
+    private final int correlationId;
+    private final String clientId;
     private final long createdTimeMs;
     private final boolean expectResponse;
     private final RequestCompletionHandler callback;
 
     /**
      * @param destination The brokerId to send the request to
+     * @param requestBuilder The builder for the request to make
+     * @param correlationId The correlation id for this client request
+     * @param clientId The client ID to use for the header
      * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created.
      * @param expectResponse Should we expect a response message or is this request complete once it is sent?
-     * @param header The request's header
-     * @param body The request's body
      * @param callback A callback to execute when the response has been received (or null if no callback is necessary)
      */
     public ClientRequest(String destination,
+                         AbstractRequest.Builder<?> requestBuilder,
+                         int correlationId,
+                         String clientId,
                          long createdTimeMs,
                          boolean expectResponse,
-                         RequestHeader header,
-                         AbstractRequest body,
                          RequestCompletionHandler callback) {
         this.destination = destination;
+        this.requestBuilder = requestBuilder;
+        this.correlationId = correlationId;
+        this.clientId = clientId;
         this.createdTimeMs = createdTimeMs;
-        this.callback = callback;
-        this.header = header;
-        this.body = body;
         this.expectResponse = expectResponse;
+        this.callback = callback;
     }
 
     @Override
     public String toString() {
         return "ClientRequest(expectResponse=" + expectResponse +
             ", callback=" + callback +
-            ", header=" + header +
-            ", body=" + body +
+            ", destination=" + destination +
+            ", correlationId=" + correlationId +
+            ", clientId=" + clientId +
             ", createdTimeMs=" + createdTimeMs +
+            ", requestBuilder=" + requestBuilder +
             ")";
     }
 
@@ -63,12 +70,17 @@ public final class ClientRequest {
         return expectResponse;
     }
 
-    public RequestHeader header() {
-        return header;
+    public ApiKeys apiKey() {
+        return requestBuilder.apiKey();
     }
 
-    public AbstractRequest body() {
-        return body;
+    public RequestHeader makeHeader() {
+        return new RequestHeader(requestBuilder.apiKey().id,
+                requestBuilder.version(), clientId, correlationId);
+    }
+
+    public AbstractRequest.Builder<?> requestBuilder() {
+        return requestBuilder;
     }
 
     public String destination() {
@@ -83,4 +95,7 @@ public final class ClientRequest {
         return createdTimeMs;
     }
 
+    public int correlationId() {
+        return correlationId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
index 3cd8f1a..2dfbba6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
@@ -27,6 +27,7 @@ public class ClientResponse {
     private final long receivedTimeMs;
     private final long latencyMs;
     private final boolean disconnected;
+    private final RuntimeException versionMismatch;
     private final AbstractResponse responseBody;
 
     /**
@@ -36,7 +37,9 @@ public class ClientResponse {
      * @param destination The node the corresponding request was sent to
      * @param receivedTimeMs The unix timestamp when this response was received
      * @param disconnected Whether the client disconnected before fully reading a response
-     * @param responseBody The response contents (or null) if we disconnected or no response was expected
+     * @param versionMismatch Whether there was a version mismatch that prevented sending the request.
+     * @param responseBody The response contents (or null) if we disconnected, no response was expected,
+     *                     or if there was a version mismatch.
      */
     public ClientResponse(RequestHeader requestHeader,
                           RequestCompletionHandler callback,
@@ -44,6 +47,7 @@ public class ClientResponse {
                           long createdTimeMs,
                           long receivedTimeMs,
                           boolean disconnected,
+                          RuntimeException versionMismatch,
                           AbstractResponse responseBody) {
         this.requestHeader = requestHeader;
         this.callback = callback;
@@ -51,6 +55,7 @@ public class ClientResponse {
         this.receivedTimeMs = receivedTimeMs;
         this.latencyMs = receivedTimeMs - createdTimeMs;
         this.disconnected = disconnected;
+        this.versionMismatch = versionMismatch;
         this.responseBody = responseBody;
     }
 
@@ -62,6 +67,10 @@ public class ClientResponse {
         return disconnected;
     }
 
+    public RuntimeException versionMismatch() {
+        return versionMismatch;
+    }
+
     public RequestHeader requestHeader() {
         return requestHeader;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index b7ff715..a0f5fab 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -15,20 +15,16 @@ package org.apache.kafka.clients;
 import java.io.Closeable;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.network.LoginType;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,14 +83,4 @@ public class ClientUtils {
         String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM);
         return ChannelBuilders.clientChannelBuilder(securityProtocol, LoginType.CLIENT, configs, clientSaslMechanism, true);
     }
-
-    public static Collection<ApiVersionsResponse.ApiVersion> buildExpectedApiVersions(Collection<ApiKeys> apiKeys) {
-        List<ApiVersionsResponse.ApiVersion> expectedApiVersions = new ArrayList<>();
-        for (ApiKeys apiKey : apiKeys)
-            expectedApiVersions.add(
-                    // once backwards client compatibility is added, expected min API version for an API should be set to it's min version
-                    new ApiVersionsResponse.ApiVersion(
-                            apiKey.id, Protocol.CURR_VERSION[apiKey.id], Protocol.CURR_VERSION[apiKey.id]));
-        return expectedApiVersions;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
index 288230b..18e7e18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
@@ -22,4 +22,4 @@ package org.apache.kafka.clients;
  */
 public enum ConnectionState {
     DISCONNECTED, CONNECTING, CHECKING_API_VERSIONS, READY
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index f171d13..f51d1f5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -16,8 +16,7 @@ import java.io.Closeable;
 import java.util.List;
 
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.AbstractRequest;
 
 /**
  * The interface for {@link NetworkClient}
@@ -31,7 +30,7 @@ public interface KafkaClient extends Closeable {
      * @param node The node to check
      * @param now The current timestamp
      */
-    public boolean isReady(Node node, long now);
+    boolean isReady(Node node, long now);
 
     /**
      * Initiate a connection to the given node (if necessary), and return true if already connected. The readiness of a
@@ -41,7 +40,7 @@ public interface KafkaClient extends Closeable {
      * @param now The current time
      * @return true iff we are ready to immediately initiate the sending of another request to the given node.
      */
-    public boolean ready(Node node, long now);
+    boolean ready(Node node, long now);
 
     /**
      * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
@@ -52,7 +51,7 @@ public interface KafkaClient extends Closeable {
      * @param now The current timestamp
      * @return The number of milliseconds to wait.
      */
-    public long connectionDelay(Node node, long now);
+    long connectionDelay(Node node, long now);
 
     /**
      * Check if the connection of the node has failed, based on the connection state. Such connection failure are
@@ -62,14 +61,14 @@ public interface KafkaClient extends Closeable {
      * @param node the node to check
      * @return true iff the connection has failed and the node is disconnected
      */
-    public boolean connectionFailed(Node node);
+    boolean connectionFailed(Node node);
 
     /**
      * Queue up the given request for sending. Requests can only be sent on ready connections.
      * @param request The request
      * @param now The current timestamp
      */
-    public void send(ClientRequest request, long now);
+    void send(ClientRequest request, long now);
 
     /**
      * Do actual reads and writes from sockets.
@@ -80,14 +79,14 @@ public interface KafkaClient extends Closeable {
      * @param now The current time in ms
      * @throws IllegalStateException If a request is sent to an unready node
      */
-    public List<ClientResponse> poll(long timeout, long now);
+    List<ClientResponse> poll(long timeout, long now);
 
     /**
      * Closes the connection to a particular node (if there is one).
      *
      * @param nodeId The id of the node
      */
-    public void close(String nodeId);
+    void close(String nodeId);
 
     /**
      * Choose the node with the fewest outstanding requests. This method will prefer a node with an existing connection,
@@ -97,39 +96,46 @@ public interface KafkaClient extends Closeable {
      * @param now The current time in ms
      * @return The node with the fewest in-flight requests.
      */
-    public Node leastLoadedNode(long now);
+    Node leastLoadedNode(long now);
 
     /**
      * The number of currently in-flight requests for which we have not yet returned a response
      */
-    public int inFlightRequestCount();
+    int inFlightRequestCount();
 
     /**
      * Get the total in-flight requests for a particular node
      * 
      * @param nodeId The id of the node
      */
-    public int inFlightRequestCount(String nodeId);
+    int inFlightRequestCount(String nodeId);
 
     /**
-     * Generate a request header for the next request
-     * 
-     * @param key The API key of the request
+     * Wake up the client if it is currently blocked waiting for I/O
      */
-    public RequestHeader nextRequestHeader(ApiKeys key);
+    void wakeup();
 
     /**
-     * Generate a request header for the given API key
+     * Create a new ClientRequest.
      *
-     * @param key The api key
-     * @param version The api version
-     * @return A request header with the appropriate client id and correlation id
+     * @param nodeId the node to send to
+     * @param requestBuilder the request builder to use
+     * @param createdTimeMs the time in milliseconds to use as the creation time of the request
+     * @param expectResponse true iff we expect a response
      */
-    public RequestHeader nextRequestHeader(ApiKeys key, short version);
+    ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder,
+                                   long createdTimeMs, boolean expectResponse);
 
     /**
-     * Wake up the client if it is currently blocked waiting for I/O
+     * Create a new ClientRequest.
+     *
+     * @param nodeId the node to send to
+     * @param requestBuilder the request builder to use
+     * @param createdTimeMs the time in milliseconds to use as the creation time of the request
+     * @param expectResponse true iff we expect a response
+     * @param callback the callback to invoke when we get a response
      */
-    public void wakeup();
-
+    ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder,
+                                          long createdTimeMs, boolean expectResponse,
+                                          RequestCompletionHandler callback);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index c7728b1..5e97eac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -13,8 +13,9 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.ObsoleteBrokerException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.network.Send;
@@ -22,6 +23,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
@@ -38,10 +40,10 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -56,11 +58,6 @@ import java.util.Set;
 public class NetworkClient implements KafkaClient {
 
     private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
-    private static final Metadata NO_METADATA = null;
-    private static final MetadataUpdater NO_METADATA_UPDATER = null;
-
-    /* apis used by this client. If this is null, client won't perform api version check against connecting brokers */
-    private final Collection<ApiVersionsResponse.ApiVersion> requiredApiVersions;
 
     /* the selector used to perform network i/o */
     private final Selectable selector;
@@ -95,23 +92,16 @@ public class NetworkClient implements KafkaClient {
 
     private final Time time;
 
-    private final Map<Integer, Collection<ApiVersionsResponse.ApiVersion>> nodeApiVersions = new HashMap<>();
+    /**
+     * True if we should send an ApiVersionRequest when first connecting to a broker.
+     */
+    private final boolean discoverBrokerVersions;
+
+    private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>();
 
     private final Set<String> nodesNeedingApiVersionsFetch = new HashSet<>();
 
-    public NetworkClient(Selectable selector,
-                         Metadata metadata,
-                         String clientId,
-                         int maxInFlightRequestsPerConnection,
-                         long reconnectBackoffMs,
-                         int socketSendBuffer,
-                         int socketReceiveBuffer,
-                         int requestTimeoutMs,
-                         Time time,
-                         Collection<ApiVersionsResponse.ApiVersion> requiredApiVersions) {
-        this(NO_METADATA_UPDATER, metadata, selector, clientId, maxInFlightRequestsPerConnection,
-                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, requiredApiVersions);
-    }
+    private final List<ClientResponse> abortedSends = new LinkedList<>();
 
     public NetworkClient(Selectable selector,
                          Metadata metadata,
@@ -121,23 +111,10 @@ public class NetworkClient implements KafkaClient {
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
-                         Time time) {
-        this(NO_METADATA_UPDATER, metadata, selector, clientId, maxInFlightRequestsPerConnection,
-                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, null);
-    }
-
-    public NetworkClient(Selectable selector,
-                         MetadataUpdater metadataUpdater,
-                         String clientId,
-                         int maxInFlightRequestsPerConnection,
-                         long reconnectBackoffMs,
-                         int socketSendBuffer,
-                         int socketReceiveBuffer,
-                         int requestTimeoutMs,
                          Time time,
-                         Collection<ApiVersionsResponse.ApiVersion> requiredApiVersions) {
-        this(metadataUpdater, NO_METADATA, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
-                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, requiredApiVersions);
+                         boolean discoverBrokerVersions) {
+        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
+                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions);
     }
 
     public NetworkClient(Selectable selector,
@@ -148,9 +125,10 @@ public class NetworkClient implements KafkaClient {
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
-                         Time time) {
-        this(metadataUpdater, NO_METADATA, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
-                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, null);
+                         Time time,
+                         boolean discoverBrokerVersions) {
+        this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
+                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions);
     }
 
     private NetworkClient(MetadataUpdater metadataUpdater,
@@ -163,10 +141,7 @@ public class NetworkClient implements KafkaClient {
                           int socketReceiveBuffer,
                           int requestTimeoutMs,
                           Time time,
-                          Collection<ApiVersionsResponse.ApiVersion> requiredApiVersions) {
-
-        this.requiredApiVersions = requiredApiVersions;
-
+                          boolean discoverBrokerVersions) {
         /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
          * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
          * super constructor is invoked.
@@ -189,6 +164,7 @@ public class NetworkClient implements KafkaClient {
         this.requestTimeoutMs = requestTimeoutMs;
         this.reconnectBackoffMs = reconnectBackoffMs;
         this.time = time;
+        this.discoverBrokerVersions = discoverBrokerVersions;
     }
 
     /**
@@ -278,15 +254,6 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
-     * Are we connected and able to send API versions request to the given connection?
-     *
-     * @param node The node
-     */
-    private boolean canSendApiVersionsRequest(String node) {
-        return this.requiredApiVersions != null && nodesNeedingApiVersionsFetch.contains(node);
-    }
-
-    /**
      * Queue up the given request for sending. Requests can only be sent out to ready nodes.
      * @param request The request
      * @param now The current timestamp
@@ -296,31 +263,69 @@ public class NetworkClient implements KafkaClient {
         doSend(request, false, now);
     }
 
-    private void sendInternalMetadataRequest(MetadataRequest metadataRequest, String nodeConnectionId, long now) {
-        ClientRequest clientRequest = new ClientRequest(nodeConnectionId, now, true,
-                nextRequestHeader(ApiKeys.METADATA), metadataRequest, null);
+    private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
+                                             String nodeConnectionId, long now) {
+        ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
         doSend(clientRequest, true, now);
     }
 
-    private void doSend(ClientRequest request, boolean isInternalRequest, long now) {
-        String nodeId = request.destination();
-        if (request.header().apiKey() == ApiKeys.API_VERSIONS.id) {
-            if (!canSendApiVersionsRequest(nodeId))
-                throw new IllegalStateException("Attempt to send API Versions request to node " + nodeId + " which is not ready.");
-        } else if (!canSendRequest(nodeId))
-            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
-
-        Send send = request.body().toSend(nodeId, request.header());
+    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
+        String nodeId = clientRequest.destination();
+        if (!isInternalRequest) {
+            // If this request came from outside the NetworkClient, validate
+            // that we can send data.  If the request is internal, we trust
+            // that that internal code has done this validation.  Validation
+            // will be slightly different for some internal requests (for
+            // example, ApiVersionsRequests can be sent prior to being in
+            // READY state.)
+            if (!canSendRequest(nodeId))
+                throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
+        }
+        AbstractRequest request = null;
+        AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
+        try {
+            NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
+            // Note: if versionInfo is null, we have no server version information. This would be
+            // the case when sending the initial ApiVersionRequest which fetches the version
+            // information itself.  It is also the case when discoverBrokerVersions is set to false.
+            if (versionInfo == null) {
+                if ((!discoverBrokerVersions) && (log.isTraceEnabled()))
+                    log.trace("No version information found when sending message of type {} to node {}",
+                            clientRequest.apiKey(), nodeId);
+            } else {
+                short version = versionInfo.usableVersion(clientRequest.apiKey());
+                if (log.isTraceEnabled())
+                    log.trace("When sending message of type {} to node {}, the best usable version is {}",
+                            clientRequest.apiKey(), nodeId, version);
+                builder.setVersion(version);
+            }
+            // The call to build may also throw UnsupportedVersionException, if there are essential
+            // fields that cannot be represented in the chosen version.
+            request = builder.build();
+        } catch (ObsoleteBrokerException | UnsupportedVersionException e) {
+            // If the version is not supported, skip sending the request over the wire.
+            // Instead, simply add it to the local queue of aborted requests.
+            log.debug("Version mismatch when attempting to send {} to {}",
+                    clientRequest.toString(), clientRequest.destination(), e);
+            ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(),
+                    clientRequest.callback(), clientRequest.destination(), now, now,
+                    false, e, null);
+            abortedSends.add(clientResponse);
+            return;
+        }
+        RequestHeader header = clientRequest.makeHeader();
+        if (log.isTraceEnabled())
+            log.trace("Sending {} to node {}", request,  nodeId);
+        Send send = request.toSend(nodeId, header);
         InFlightRequest inFlightRequest = new InFlightRequest(
-                request.header(),
-                request.createdTimeMs(),
-                request.destination(),
-                request.callback(),
-                request.expectResponse(),
+                header,
+                clientRequest.createdTimeMs(),
+                clientRequest.destination(),
+                clientRequest.callback(),
+                clientRequest.expectResponse(),
                 isInternalRequest,
                 send,
                 now);
-
         this.inFlightRequests.add(inFlightRequest);
         selector.send(inFlightRequest.send);
     }
@@ -346,10 +351,12 @@ public class NetworkClient implements KafkaClient {
         // process completed actions
         long updatedNow = this.time.milliseconds();
         List<ClientResponse> responses = new ArrayList<>();
+        handleAbortedSends(responses);
         handleCompletedSends(responses, updatedNow);
         handleCompletedReceives(responses, updatedNow);
         handleDisconnections(responses, updatedNow);
-        handleConnections(updatedNow);
+        handleConnections();
+        handleInitiateApiVersionRequests(updatedNow);
         handleTimedOutRequests(responses, updatedNow);
 
         // invoke callbacks
@@ -381,29 +388,6 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
-     * Generate a request header for the given API key
-     *
-     * @param key The api key
-     * @return A request header with the appropriate client id and correlation id
-     */
-    @Override
-    public RequestHeader nextRequestHeader(ApiKeys key) {
-        return new RequestHeader(key.id, clientId, correlation++);
-    }
-
-    /**
-     * Generate a request header for the given API key and version
-     *
-     * @param key The api key
-     * @param version The api version
-     * @return A request header with the appropriate client id and correlation id
-     */
-    @Override
-    public RequestHeader nextRequestHeader(ApiKeys key, short version) {
-        return new RequestHeader(key.id, version, clientId, correlation++);
-    }
-
-    /**
      * Interrupt the client if it is blocked waiting on I/O.
      */
     @Override
@@ -498,10 +482,15 @@ public class NetworkClient implements KafkaClient {
         }
 
         // we disconnected, so we should probably refresh our metadata
-        if (nodeIds.size() > 0)
+        if (!nodeIds.isEmpty())
             metadataUpdater.requestUpdate();
     }
 
+    private void handleAbortedSends(List<ClientResponse> responses) {
+        responses.addAll(abortedSends);
+        abortedSends.clear();
+    }
+
     /**
      * Handle any completed request send. In particular if no response is expected consider the request complete.
      *
@@ -534,30 +523,28 @@ public class NetworkClient implements KafkaClient {
             if (req.isInternalRequest && body instanceof MetadataResponse)
                 metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
             else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
-                handleApiVersionsResponse(req, (ApiVersionsResponse) body);
+                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
             else
                 responses.add(req.completed(body, now));
         }
     }
 
-    private void handleApiVersionsResponse(InFlightRequest req, ApiVersionsResponse apiVersionsResponse) {
+    private void handleApiVersionsResponse(List<ClientResponse> responses,
+                                           InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) {
         final String node = req.destination;
-        for (ApiVersionsResponse.ApiVersion requiredApiVersion : requiredApiVersions) {
-            final ApiVersionsResponse.ApiVersion supportedApiVersion = apiVersionsResponse.apiVersion(requiredApiVersion.apiKey);
-            if (supportedApiVersion == null) {
-                close(node);
-                throw new KafkaException("Node " + req.destination + " does not support Api " + requiredApiVersion.apiKey + ".");
-            }
-            if (supportedApiVersion.maxVersion < requiredApiVersion.minVersion || supportedApiVersion.minVersion > requiredApiVersion.maxVersion) {
-                close(node);
-                throw new KafkaException("Node " + req.destination + " does not support required versions for Api " + requiredApiVersion.apiKey + "." +
-                        " Supported versions: " + "[" + supportedApiVersion.minVersion + ", " + supportedApiVersion.maxVersion + "]" + "," +
-                        " required versions: " + "[" + requiredApiVersion.minVersion + ", " + requiredApiVersion.maxVersion + "]" + ".");
-            }
+        if (apiVersionsResponse.errorCode() != Errors.NONE.code()) {
+            log.warn("Node {} got error {} when making an ApiVersionsRequest.  Disconnecting.",
+                    node, Errors.forCode(apiVersionsResponse.errorCode()));
+            this.selector.close(node);
+            processDisconnection(responses, node, now);
+            return;
         }
-        nodeApiVersions.put(Integer.parseInt(node), apiVersionsResponse.apiVersions());
+        NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.apiVersions());
+        nodeApiVersions.put(node, nodeVersionInfo);
         this.connectionStates.ready(node);
-        log.debug("Recorded API versions for node {}: {}", node, apiVersionsResponse.apiVersions());
+        if (log.isDebugEnabled()) {
+            log.debug("Recorded API versions for node {}: {}", node, nodeVersionInfo);
+        }
     }
 
     /**
@@ -579,32 +566,31 @@ public class NetworkClient implements KafkaClient {
     /**
      * Record any newly completed connections
      */
-    private void handleConnections(long now) {
+    private void handleConnections() {
         for (String node : this.selector.connected()) {
-            log.debug("Completed connection to node {}", node);
-            // Though the node is connected, we might not still be able to send requests. For instance,
-            // in case of SSL connection, SSL handshake happens after connection is established.
-            if (this.requiredApiVersions == null) {
-                this.connectionStates.ready(node);
-            } else {
+            // We are now connected.  Node that we might not still be able to send requests. For instance,
+            // if SSL is enabled, the SSL handshake happens after the connection is established.
+            // Therefore, it is still necessary to check isChannelReady before attempting to send on this
+            // connection.
+            if (discoverBrokerVersions) {
                 this.connectionStates.checkingApiVersions(node);
                 nodesNeedingApiVersionsFetch.add(node);
+                log.debug("Completed connection to node {}.  Fetching API versions.", node);
+            } else {
+                this.connectionStates.ready(node);
+                log.debug("Completed connection to node {}.  Ready.", node);
             }
         }
-        maybeInitiateApiVersionsFetch(now);
     }
 
-    private void maybeInitiateApiVersionsFetch(long now) {
-        if (this.requiredApiVersions == null)
-            return;
-
+    private void handleInitiateApiVersionRequests(long now) {
         Iterator<String> iter = nodesNeedingApiVersionsFetch.iterator();
         while (iter.hasNext()) {
             String node = iter.next();
             if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
                 log.debug("Initiating API versions fetch from node {}.", node);
-                ClientRequest clientRequest = new ClientRequest(node, now, true,
-                        nextRequestHeader(ApiKeys.API_VERSIONS), ApiVersionsRequest.API_VERSIONS_REQUEST, null);
+                ApiVersionsRequest.Builder apiVersionRequest = new ApiVersionsRequest.Builder();
+                ClientRequest clientRequest = newClientRequest(node, apiVersionRequest, now, true);
                 doSend(clientRequest, true, now);
                 iter.remove();
             }
@@ -743,11 +729,12 @@ public class NetworkClient implements KafkaClient {
 
             if (canSendRequest(nodeConnectionId)) {
                 this.metadataFetchInProgress = true;
-                MetadataRequest metadataRequest;
+                MetadataRequest.Builder metadataRequest;
                 if (metadata.needMetadataForAllTopics())
-                    metadataRequest = MetadataRequest.allTopics();
+                    metadataRequest = MetadataRequest.Builder.allTopics();
                 else
-                    metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
+                    metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()));
+
 
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                 sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
@@ -778,6 +765,19 @@ public class NetworkClient implements KafkaClient {
 
     }
 
+    @Override
+    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
+                                          boolean expectResponse) {
+        return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, null);
+    }
+
+    @Override
+    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
+                                          boolean expectResponse, RequestCompletionHandler callback) {
+        return new ClientRequest(nodeId, requestBuilder, correlation++, clientId, createdTimeMs, expectResponse,
+                callback);
+    }
+
     static class InFlightRequest {
         final RequestHeader header;
         final String destination;
@@ -807,13 +807,15 @@ public class NetworkClient implements KafkaClient {
         }
 
         public ClientResponse completed(AbstractResponse response, long timeMs) {
-            return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, false, response);
+            return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, false, null, response);
         }
 
         public ClientResponse disconnected(long timeMs) {
-            return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, true, null);
+            return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, true, null, null);
         }
-
     }
 
+    public boolean discoverBrokerVersions() {
+        return discoverBrokerVersions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
new file mode 100644
index 0000000..6acbb63
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class NodeApiVersions {
+    private final Collection<ApiVersion> apiVersions;
+
+    // An array of the usable versions of each API, indexed by ApiKeys ID.
+    private final Map<ApiKeys, Short> usableVersions = new EnumMap<>(ApiKeys.class);
+
+    public NodeApiVersions(Collection<ApiVersion> apiVersions) {
+        this.apiVersions = apiVersions;
+        for (ApiVersion apiVersion : apiVersions) {
+            int apiKeyId = apiVersion.apiKey;
+            // Newer brokers may support ApiKeys we don't know about, ignore them
+            if (ApiKeys.hasId(apiKeyId)) {
+                short version = Utils.min(ProtoUtils.latestVersion(apiKeyId), apiVersion.maxVersion);
+                if (version >= apiVersion.minVersion && version >= ProtoUtils.oldestVersion(apiKeyId))
+                    usableVersions.put(ApiKeys.forId(apiKeyId), version);
+            }
+        }
+    }
+
+    /**
+     * Return the most recent version supported by both the client and the server.
+     */
+    public short usableVersion(ApiKeys apiKey) {
+        Short usableVersion = usableVersions.get(apiKey);
+        if (usableVersion == null) {
+            throw new UnsupportedVersionException("The client cannot send an " +
+                    "API request of type " + apiKey + ", because the " +
+                    "server does not understand any of the versions this client supports.");
+        }
+        return usableVersion;
+    }
+
+    /**
+     * This toString method is relatively expensive, so avoid calling it unless debug logging is turned on.
+     */
+    @Override
+    public String toString() {
+        // The apiVersion collection may not be in sorted order.  We put it into
+        // a TreeMap before printing it out to ensure that we always print in
+        // ascending order.
+        TreeMap<Short, String> apiKeysText = new TreeMap<>();
+        for (ApiVersion apiVersion : this.apiVersions)
+            apiKeysText.put(apiVersion.apiKey, apiVersionToText(apiVersion));
+
+        // Also handle the case where some apiKey types are
+        // unknown, which may happen when either the client or server is newer.
+        for (ApiKeys apiKey : ApiKeys.values()) {
+            if (!apiKeysText.containsKey(apiKey.id)) {
+                StringBuilder bld = new StringBuilder();
+                bld.append(apiKey.name).append("(").
+                    append(apiKey.id).append("): ").append("UNSUPPORTED");
+                apiKeysText.put(apiKey.id, bld.toString());
+            }
+        }
+        return "{" + Utils.join(apiKeysText.values(), ", ") + "}";
+    }
+
+    private String apiVersionToText(ApiVersion apiVersion) {
+        StringBuilder bld = new StringBuilder();
+        ApiKeys apiKey = null;
+        if (ApiKeys.hasId(apiVersion.apiKey)) {
+            apiKey = ApiKeys.forId(apiVersion.apiKey);
+        }
+        if (apiKey != null) {
+            bld.append(apiKey.name).append("(").append(apiKey.id).append("): ");
+        } else {
+            bld.append("UNKNOWN(").append(apiKey.id).append("): ");
+        }
+        if (apiVersion.minVersion == apiVersion.maxVersion) {
+            bld.append(apiVersion.minVersion);
+        } else {
+            bld.append(apiVersion.minVersion).append(" to ").append(apiVersion.maxVersion);
+        }
+        if (apiKey != null) {
+            Short usableVersion = usableVersions.get(apiKey);
+            if (usableVersion == null) {
+                bld.append(" [usable: NONE]");
+            } else {
+                bld.append(" [usable: ").append(usableVersion).append("]");
+            }
+        }
+        return bld.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 7547c6d..04fe789 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -39,7 +39,6 @@ import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Time;
@@ -519,7 +518,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             ApiKeys.OFFSET_COMMIT,
             ApiKeys.OFFSET_FETCH,
             ApiKeys.SYNC_GROUP);
-    private static final Collection<ApiVersionsResponse.ApiVersion> EXPECTED_API_VERSIONS = ClientUtils.buildExpectedApiVersions(CONSUMER_APIS);
 
     private final String clientId;
     private final ConsumerCoordinator coordinator;
@@ -673,7 +671,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                     time,
-                    EXPECTED_API_VERSIONS);
+                    true);
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
@@ -1351,7 +1349,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             if (parts != null)
                 return parts;
 
-            Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topic)), requestTimeoutMs);
+            Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
+                    new MetadataRequest.Builder(Collections.singletonList(topic)), requestTimeoutMs);
             return topicMetadata.get(topic);
         } finally {
             release();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
index 32baf38..f74a333 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
@@ -21,7 +21,10 @@ import org.apache.kafka.common.utils.Utils;
 /**
  * A container class for offset and timestamp.
  *
- * Both offset and timestamp are non-negative.
+ * Offset must be non-negative.
+ *
+ * The timestamp should never be negative, unless it is invalid.  This could happen when
+ * handling a response from a broker that doesn't support KIP-79.
  */
 public final class OffsetAndTimestamp {
     private final long timestamp;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index cdea51f..1af324e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.GroupCoordinatorRequest;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
@@ -380,16 +379,15 @@ public abstract class AbstractCoordinator implements Closeable {
 
         // send a join group request to the coordinator
         log.info("(Re-)joining group {}", groupId);
-        JoinGroupRequest request = new JoinGroupRequest(
+        JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
                 groupId,
                 this.sessionTimeoutMs,
-                this.rebalanceTimeoutMs,
                 this.generation.memberId,
                 protocolType(),
-                metadata());
+                metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
 
-        log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
-        return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
+        log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
+        return client.send(coordinator, requestBuilder)
                 .compose(new JoinGroupResponseHandler());
     }
 
@@ -450,10 +448,12 @@ public abstract class AbstractCoordinator implements Closeable {
 
     private RequestFuture<ByteBuffer> onJoinFollower() {
         // send follower's sync group with an empty assignment
-        SyncGroupRequest request = new SyncGroupRequest(groupId, generation.generationId,
-                generation.memberId, Collections.<String, ByteBuffer>emptyMap());
-        log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
-        return sendSyncGroupRequest(request);
+        SyncGroupRequest.Builder requestBuilder =
+                new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
+                        Collections.<String, ByteBuffer>emptyMap());
+        log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator,
+                requestBuilder);
+        return sendSyncGroupRequest(requestBuilder);
     }
 
     private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
@@ -462,18 +462,20 @@ public abstract class AbstractCoordinator implements Closeable {
             Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
                     joinResponse.members());
 
-            SyncGroupRequest request = new SyncGroupRequest(groupId, generation.generationId, generation.memberId, groupAssignment);
-            log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
-            return sendSyncGroupRequest(request);
+            SyncGroupRequest.Builder requestBuilder =
+                    new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
+            log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}",
+                    groupId, this.coordinator, requestBuilder);
+            return sendSyncGroupRequest(requestBuilder);
         } catch (RuntimeException e) {
             return RequestFuture.failure(e);
         }
     }
 
-    private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest request) {
+    private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
         if (coordinatorUnknown())
             return RequestFuture.coordinatorNotAvailable();
-        return client.send(coordinator, ApiKeys.SYNC_GROUP, request)
+        return client.send(coordinator, requestBuilder)
                 .compose(new SyncGroupResponseHandler());
     }
 
@@ -518,8 +520,9 @@ public abstract class AbstractCoordinator implements Closeable {
     private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
         // initiate the group metadata request
         log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
-        GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
-        return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
+        GroupCoordinatorRequest.Builder requestBuilder =
+                new GroupCoordinatorRequest.Builder(this.groupId);
+        return client.send(node, requestBuilder)
                      .compose(new GroupCoordinatorResponseHandler());
     }
 
@@ -632,8 +635,9 @@ public abstract class AbstractCoordinator implements Closeable {
         if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) {
             // this is a minimal effort attempt to leave the group. we do not
             // attempt any resending if the request fails or times out.
-            LeaveGroupRequest request = new LeaveGroupRequest(groupId, generation.memberId);
-            client.send(coordinator, ApiKeys.LEAVE_GROUP, request)
+            LeaveGroupRequest.Builder request =
+                    new LeaveGroupRequest.Builder(groupId, generation.memberId);
+            client.send(coordinator, request)
                     .compose(new LeaveGroupResponseHandler());
             client.pollNoWakeup();
         }
@@ -657,8 +661,9 @@ public abstract class AbstractCoordinator implements Closeable {
 
     // visible for testing
     synchronized RequestFuture<Void> sendHeartbeatRequest() {
-        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation.generationId, this.generation.memberId);
-        return client.send(coordinator, ApiKeys.HEARTBEAT, req)
+        HeartbeatRequest.Builder requestBuilder =
+                new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
+        return client.send(coordinator, requestBuilder)
                 .compose(new HeartbeatResponseHandler());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a122575..9006d70 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
@@ -616,16 +615,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (generation == null)
             return RequestFuture.failure(new CommitFailedException());
 
-        OffsetCommitRequest req = new OffsetCommitRequest(
-                this.groupId,
-                generation.generationId,
-                generation.memberId,
-                OffsetCommitRequest.DEFAULT_RETENTION_TIME,
-                offsetData);
+        OffsetCommitRequest.Builder builder =
+                new OffsetCommitRequest.Builder(this.groupId, offsetData).
+                        setGenerationId(generation.generationId).
+                        setMemberId(generation.memberId).
+                        setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
 
         log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
 
-        return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
+        return client.send(coordinator, builder)
                 .compose(new OffsetCommitResponseHandler(offsets));
     }
 
@@ -719,10 +717,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
         // construct the request
-        OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions));
+        OffsetFetchRequest.Builder requestBuilder =
+                new OffsetFetchRequest.Builder(this.groupId, new ArrayList<>(partitions));
 
         // send the request with a callback
-        return client.send(coordinator, ApiKeys.OFFSET_FETCH, request)
+        return client.send(coordinator, requestBuilder)
                 .compose(new OffsetFetchResponseHandler());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index b09b787..e91a159 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.Time;
@@ -86,25 +85,17 @@ public class ConsumerNetworkClient implements Closeable {
      * Use the returned future to obtain the result of the send. Note that there is no
      * need to check for disconnects explicitly on the {@link ClientResponse} object;
      * instead, the future will be failed with a {@link DisconnectException}.
+     *
      * @param node The destination of the request
-     * @param api The Kafka API call
-     * @param request The request payload
+     * @param requestBuilder A builder for the request payload
      * @return A future which indicates the result of the send.
      */
     public RequestFuture<ClientResponse> send(Node node,
-                                              ApiKeys api,
-                                              AbstractRequest request) {
-        return send(node, api, ProtoUtils.latestVersion(api.id), request);
-    }
-
-    private RequestFuture<ClientResponse> send(Node node,
-                                               ApiKeys api,
-                                               short version,
-                                               AbstractRequest request) {
+                                              AbstractRequest.Builder<?> requestBuilder) {
         long now = time.milliseconds();
         RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
-        RequestHeader header = client.nextRequestHeader(api, version);
-        ClientRequest clientRequest = new ClientRequest(node.idString(), now, true, header, request, completionHandler);
+        ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
+                completionHandler);
         put(node, clientRequest);
 
         // wakeup the client in case it is blocking in poll so that we can send the queued request
@@ -343,8 +334,8 @@ public class ConsumerNetworkClient implements Closeable {
                 iterator.remove();
                 for (ClientRequest request : requestEntry.getValue()) {
                     RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
-                    handler.onComplete(new ClientResponse(request.header(), request.callback(), request.destination(),
-                            request.createdTimeMs(), now, true, null));
+                    handler.onComplete(new ClientResponse(request.makeHeader(), request.callback(), request.destination(),
+                            request.createdTimeMs(), now, true, null, null));
                 }
             }
         }
@@ -486,6 +477,8 @@ public class ConsumerNetworkClient implements Closeable {
                 log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                         api, requestHeader, correlation, response.destination());
                 future.raise(DisconnectException.INSTANCE);
+            } else if (response.versionMismatch() != null) {
+                future.raise(response.versionMismatch());
             } else {
                 future.complete(response);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 22588a8..48db458 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -37,7 +37,6 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.LogEntry;
@@ -134,7 +133,7 @@ public class Fetcher<K, V> {
         return !completedFetches.isEmpty();
     }
 
-    private boolean matchesRequestedPartitions(FetchRequest request, FetchResponse response) {
+    private boolean matchesRequestedPartitions(FetchRequest.Builder request, FetchResponse response) {
         Set<TopicPartition> requestedPartitions = request.fetchData().keySet();
         Set<TopicPartition> fetchedPartitions = response.responseData().keySet();
         return fetchedPartitions.equals(requestedPartitions);
@@ -146,12 +145,12 @@ public class Fetcher<K, V> {
      * @return number of fetches sent
      */
     public int sendFetches() {
-        Map<Node, FetchRequest> fetchRequestMap = createFetchRequests();
-        for (Map.Entry<Node, FetchRequest> fetchEntry : fetchRequestMap.entrySet()) {
-            final FetchRequest request = fetchEntry.getValue();
+        Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
+        for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
+            final FetchRequest.Builder request = fetchEntry.getValue();
             final Node fetchTarget = fetchEntry.getKey();
 
-            client.send(fetchTarget, ApiKeys.FETCH, request)
+            client.send(fetchTarget, request)
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
                         public void onSuccess(ClientResponse resp) {
@@ -231,7 +230,7 @@ public class Fetcher<K, V> {
      * @return The map of topics with their partition information
      */
     public Map<String, List<PartitionInfo>> getAllTopicMetadata(long timeout) {
-        return getTopicMetadata(MetadataRequest.allTopics(), timeout);
+        return getTopicMetadata(MetadataRequest.Builder.allTopics(), timeout);
     }
 
     /**
@@ -241,7 +240,7 @@ public class Fetcher<K, V> {
      * @param timeout time for which getting topic metadata is attempted
      * @return The map of topics with their partition information
      */
-    public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest request, long timeout) {
+    public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest.Builder request, long timeout) {
         // Save the round trip if no topics are requested.
         if (!request.isAllTopics() && request.topics().isEmpty())
             return Collections.emptyMap();
@@ -315,12 +314,12 @@ public class Fetcher<K, V> {
      * Send Metadata Request to least loaded node in Kafka cluster asynchronously
      * @return A future that indicates result of sent metadata request
      */
-    private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest request) {
+    private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest.Builder request) {
         final Node node = client.leastLoadedNode();
         if (node == null)
             return RequestFuture.noBrokersAvailable();
         else
-            return client.send(node, ApiKeys.METADATA, request);
+            return client.send(node, request);
     }
 
     /**
@@ -331,31 +330,60 @@ public class Fetcher<K, V> {
      */
     private void resetOffset(TopicPartition partition) {
         OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
-        final long timestamp;
-        if (strategy == OffsetResetStrategy.EARLIEST)
-            timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
-        else if (strategy == OffsetResetStrategy.LATEST)
-            timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
-        else
-            throw new NoOffsetForPartitionException(partition);
-
         log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT));
-        long offset = getOffsetsByTimes(Collections.singletonMap(partition, timestamp), Long.MAX_VALUE).get(partition).offset();
-
+        long offset = listOffset(partition, strategy);
         // we might lose the assignment while fetching the offset, so check it is still active
         if (subscriptions.isAssigned(partition))
             this.subscriptions.seek(partition, offset);
     }
 
+    private long listOffset(TopicPartition partition, OffsetResetStrategy strategy) {
+        final long timestamp;
+        switch (strategy) {
+            case EARLIEST:
+                timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
+                break;
+            case LATEST:
+                timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
+                break;
+            default:
+                throw new NoOffsetForPartitionException(partition);
+        }
+        while (true) {
+            RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future =
+                    sendListOffsetRequests(false, Collections.singletonMap(partition, timestamp));
+            client.poll(future);
+            if (future.succeeded()) {
+                OffsetAndTimestamp offsetAndTimestamp = future.value().get(partition);
+                if (offsetAndTimestamp == null)
+                    throw new NoOffsetForPartitionException(partition);
+                return offsetAndTimestamp.offset();
+            }
+            if (!future.isRetriable())
+                throw future.exception();
+            if (future.exception() instanceof InvalidMetadataException)
+                client.awaitMetadataUpdate();
+            else
+                time.sleep(retryBackoffMs);
+        }
+    }
+
+
     public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
                                                                      long timeout) {
+        return retrieveOffsetsByTimes(timestampsToSearch, timeout, true);
+    }
+
+    private Map<TopicPartition, OffsetAndTimestamp> retrieveOffsetsByTimes(
+            Map<TopicPartition, Long> timestampsToSearch, long timeout, boolean requireTimestamps) {
         if (timestampsToSearch.isEmpty())
             return Collections.emptyMap();
 
         long startMs = time.milliseconds();
         long remaining = timeout;
         do {
-            RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future = sendListOffsetRequests(timestampsToSearch);
+            RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future =
+                    sendListOffsetRequests(requireTimestamps, timestampsToSearch);
             client.poll(future, remaining);
 
             if (!future.isDone())
@@ -398,9 +426,10 @@ public class Fetcher<K, V> {
         for (TopicPartition tp : partitions)
             timestampsToSearch.put(tp, timestamp);
         Map<TopicPartition, Long> result = new HashMap<>();
-        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : getOffsetsByTimes(timestampsToSearch, timeout).entrySet())
+        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
+                retrieveOffsetsByTimes(timestampsToSearch, timeout, false).entrySet()) {
             result.put(entry.getKey(), entry.getValue().offset());
-
+        }
         return result;
     }
 
@@ -487,10 +516,14 @@ public class Fetcher<K, V> {
     /**
      * Search the offsets by target times for the specified partitions.
      *
+     * @param requireTimestamps true if we should fail with an ObsoleteBrokerException if the broker does not support
+     *                         fetching precise timestamps for offsets
      * @param timestampsToSearch the mapping between partitions and target time
      * @return A response which can be polled to obtain the corresponding timestamps and offsets.
      */
-    private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> sendListOffsetRequests(final Map<TopicPartition, Long> timestampsToSearch) {
+    private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> sendListOffsetRequests(
+            final boolean requireTimestamps,
+            final Map<TopicPartition, Long> timestampsToSearch) {
         // Group the partitions by node.
         final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
         for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
@@ -518,7 +551,7 @@ public class Fetcher<K, V> {
         final Map<TopicPartition, OffsetAndTimestamp> fetchedTimestampOffsets = new HashMap<>();
         final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
         for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
-            sendListOffsetRequest(entry.getKey(), entry.getValue())
+            sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps)
                     .addListener(new RequestFutureListener<Map<TopicPartition, OffsetAndTimestamp>>() {
                         @Override
                         public void onSuccess(Map<TopicPartition, OffsetAndTimestamp> value) {
@@ -547,13 +580,20 @@ public class Fetcher<K, V> {
      *
      * @param node The node to send the ListOffsetRequest to.
      * @param timestampsToSearch The mapping from partitions to the target timestamps.
+     * @param requireTimestamp  True if we require a timestamp in the response.
      * @return A response which can be polled to obtain the corresponding timestamps and offsets.
      */
     private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> sendListOffsetRequest(final Node node,
-                                                                                         final Map<TopicPartition, Long> timestampsToSearch) {
-        ListOffsetRequest request = new ListOffsetRequest(timestampsToSearch, ListOffsetRequest.CONSUMER_REPLICA_ID);
-        log.trace("Sending ListOffsetRequest {} to broker {}", request, node);
-        return client.send(node, ApiKeys.LIST_OFFSETS, request)
+                                                                                         final Map<TopicPartition, Long> timestampsToSearch,
+                                                                                         boolean requireTimestamp) {
+        ListOffsetRequest.Builder builder = new ListOffsetRequest.Builder().setTargetTimes(timestampsToSearch);
+
+        // If we need a timestamp in the response, the minimum RPC version we can send is v1.
+        // Otherwise, v0 is OK.
+        builder.setMinVersion(requireTimestamp ? (short) 1 : (short) 0);
+
+        log.trace("Sending ListOffsetRequest {} to broker {}", builder, node);
+        return client.send(node, builder)
                 .compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetAndTimestamp>>() {
                     @Override
                     public void onSuccess(ClientResponse response, RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) {
@@ -570,6 +610,7 @@ public class Fetcher<K, V> {
      * @param listOffsetResponse The response from the server.
      * @param future The future to be completed by the response.
      */
+    @SuppressWarnings("deprecation")
     private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch,
                                           ListOffsetResponse listOffsetResponse,
                                           RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) {
@@ -579,11 +620,34 @@ public class Fetcher<K, V> {
             ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition);
             Errors error = Errors.forCode(partitionData.errorCode);
             if (error == Errors.NONE) {
-                OffsetAndTimestamp offsetAndTimestamp = null;
-                if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET)
-                    offsetAndTimestamp = new OffsetAndTimestamp(partitionData.offset, partitionData.timestamp);
-                log.debug("Fetched {} for partition {}", offsetAndTimestamp, topicPartition);
-                timestampOffsetMap.put(topicPartition, offsetAndTimestamp);
+                if (partitionData.offsets != null) {
+                    // Handle v0 response
+                    long offset;
+                    if (partitionData.offsets.size() > 1) {
+                        future.raise(new IllegalStateException("Unexpected partitionData response of length " +
+                                partitionData.offsets.size()));
+                        return;
+                    } else if (partitionData.offsets.isEmpty()) {
+                        offset = ListOffsetResponse.UNKNOWN_OFFSET;
+                    } else {
+                        offset = partitionData.offsets.get(0);
+                    }
+                    log.debug("Handling v0 ListOffsetResponse response for {}.  Fetched offset {}",
+                            topicPartition, offset);
+                    if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
+                        OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(offset, -1);
+                        timestampOffsetMap.put(topicPartition, offsetAndTimestamp);
+                    }
+                } else {
+                    // Handle v1 and later response
+                    log.debug("Handling ListOffsetResponse response for {}.  Fetched offset {}, timestamp {}",
+                            topicPartition, partitionData.offset, partitionData.timestamp);
+                    if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
+                        OffsetAndTimestamp offsetAndTimestamp =
+                                new OffsetAndTimestamp(partitionData.offset, partitionData.timestamp);
+                        timestampOffsetMap.put(topicPartition, offsetAndTimestamp);
+                    }
+                }
             } else if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
                 // The message format on the broker side is before 0.10.0, we simply put null in the response.
                 log.debug("Cannot search by timestamp for partition {} because the message format version " +
@@ -598,8 +662,7 @@ public class Fetcher<K, V> {
                         "may not exist or the user may not have Describe access to it", topicPartition);
                 future.raise(error);
             } else {
-                log.warn("Attempt to fetch offsets for partition {} failed due to: {}",
-                        topicPartition, error.message());
+                log.warn("Attempt to fetch offsets for partition {} failed due to: {}", topicPartition, error.message());
                 future.raise(new StaleMetadataException());
             }
         }
@@ -624,7 +687,7 @@ public class Fetcher<K, V> {
      * Create fetch requests for all nodes for which we have assigned partitions
      * that have no existing requests in flight.
      */
-    private Map<Node, FetchRequest> createFetchRequests() {
+    private Map<Node, FetchRequest.Builder> createFetchRequests() {
         // create the fetch info
         Cluster cluster = metadata.fetch();
         Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>();
@@ -649,10 +712,11 @@ public class Fetcher<K, V> {
         }
 
         // create the fetches
-        Map<Node, FetchRequest> requests = new HashMap<>();
+        Map<Node, FetchRequest.Builder> requests = new HashMap<>();
         for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
             Node node = entry.getKey();
-            FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue());
+            FetchRequest.Builder fetch = new FetchRequest.Builder(this.maxWaitMs, this.minBytes, entry.getValue()).
+                    setMaxBytes(this.maxBytes);
             requests.put(node, fetch);
         }
         return requests;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e23f74d..38f87f6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -43,7 +43,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
-import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
@@ -53,7 +52,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -140,7 +138,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private static final List<ApiKeys> PRODUCER_APIS = Arrays.asList(
             ApiKeys.METADATA,
             ApiKeys.PRODUCE);
-    private static final Collection<ApiVersionsResponse.ApiVersion> EXPECTED_API_VERSIONS = ClientUtils.buildExpectedApiVersions(PRODUCER_APIS);
 
     private String clientId;
     private final Partitioner partitioner;
@@ -320,7 +317,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                     this.requestTimeoutMs,
                     time,
-                    EXPECTED_API_VERSIONS);
+                    true);
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,


Mime
View raw message