kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [2/3] KAFKA-1326 Refactor Sender to support consumer.
Date Wed, 11 Jun 2014 15:47:59 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 4352466..c67b947 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -3,20 +3,24 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- *
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -29,19 +33,11 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.network.NetworkReceive;
-import org.apache.kafka.common.network.NetworkSend;
-import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.MetadataRequest;
-import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
-import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
-import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -56,16 +52,13 @@ public class Sender implements Runnable {
     private static final Logger log = LoggerFactory.getLogger(Sender.class);
 
     /* the state of each nodes connection */
-    private final NodeStates nodeStates;
+    private final KafkaClient client;
 
     /* the record accumulator that batches records */
     private final RecordAccumulator accumulator;
 
-    /* the selector used to perform network i/o */
-    private final Selectable selector;
-
-    /* the client id used to identify this client in requests to the server */
-    private final String clientId;
+    /* the metadata for the client */
+    private final Metadata metadata;
 
     /* the maximum request size to attempt to send to the server */
     private final int maxRequestSize;
@@ -79,67 +72,33 @@ public class Sender implements Runnable {
     /* the number of times to retry a failed request before giving up */
     private final int retries;
 
-    /* the socket send buffer size in bytes */
-    private final int socketSendBuffer;
-
-    /* the socket receive size buffer in bytes */
-    private final int socketReceiveBuffer;
-
-    /* the set of currently in-flight requests awaiting a response from the server */
-    private final InFlightRequests inFlightRequests;
-
-    /* a reference to the current Cluster instance */
-    private final Metadata metadata;
-
     /* the clock instance used for getting the time */
     private final Time time;
 
-    /* the current node to attempt to use for metadata requests (will round-robin over nodes) */
-    private int metadataFetchNodeIndex;
-
-    /* the current correlation id to use when sending requests to servers */
-    private int correlation;
-
-    /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
-    private boolean metadataFetchInProgress;
-
     /* true while the sender thread is still running */
     private volatile boolean running;
 
     /* metrics */
     private final SenderMetrics sensors;
 
-    public Sender(Selectable selector,
+    public Sender(KafkaClient client,
                   Metadata metadata,
                   RecordAccumulator accumulator,
-                  String clientId,
                   int maxRequestSize,
-                  long reconnectBackoffMs,
                   short acks,
                   int retries,
                   int requestTimeout,
-                  int socketSendBuffer,
-                  int socketReceiveBuffer,
-                  int maxInFlightRequestsPerConnection,
                   Metrics metrics,
                   Time time) {
-        this.nodeStates = new NodeStates(reconnectBackoffMs);
+        this.client = client;
         this.accumulator = accumulator;
-        this.selector = selector;
-        this.maxRequestSize = maxRequestSize;
         this.metadata = metadata;
-        this.clientId = clientId;
+        this.maxRequestSize = maxRequestSize;
         this.running = true;
         this.requestTimeout = requestTimeout;
         this.acks = acks;
         this.retries = retries;
-        this.socketSendBuffer = socketSendBuffer;
-        this.socketReceiveBuffer = socketReceiveBuffer;
-        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
-        this.correlation = 0;
-        this.metadataFetchInProgress = false;
         this.time = time;
-        this.metadataFetchNodeIndex = new Random().nextInt();
         this.sensors = new SenderMetrics(metrics);
     }
 
@@ -169,128 +128,46 @@ public class Sender implements Runnable {
             } catch (Exception e) {
                 log.error("Uncaught error in kafka producer I/O thread: ", e);
             }
-        } while (this.accumulator.hasUnsent() || this.inFlightRequests.totalInFlightRequests() > 0);
+        } while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0);
 
-        // close all the connections
-        this.selector.close();
+        this.client.close();
 
         log.debug("Shutdown of Kafka producer I/O thread has completed.");
     }
 
     /**
      * Run a single iteration of sending
-     *
-     * @param nowMs The current POSIX time in milliseconds
+     * 
+     * @param now The current POSIX time in milliseconds
      */
-    public void run(long nowMs) {
+    public void run(long now) {
         Cluster cluster = metadata.fetch();
         // get the list of partitions with data ready to send
-        Set<Node> ready = this.accumulator.ready(cluster, nowMs);
-
-        // should we update our metadata?
-        List<NetworkSend> sends = new ArrayList<NetworkSend>();
-        maybeUpdateMetadata(cluster, sends, nowMs);
+        Set<Node> ready = this.accumulator.ready(cluster, now);
 
-        // prune the list of ready nodes to eliminate any that we aren't ready to send yet
-        Set<Node> sendable = processReadyNode(ready, nowMs);
+        // remove any nodes we aren't ready to send to
+        for (Node node : ready) {
+            if (!this.client.ready(node, now))
+                ready.remove(node);
+        }
 
         // create produce requests
-        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, sendable, this.maxRequestSize, nowMs);
-        List<InFlightRequest> requests = generateProduceRequests(batches, nowMs);
+        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, ready, this.maxRequestSize, now);
+        List<ClientRequest> requests = createProduceRequests(batches, now);
         sensors.updateProduceRequestMetrics(requests);
 
         if (ready.size() > 0) {
-            log.trace("Partitions with complete batches: {}", ready);
-            log.trace("Partitions ready to initiate a request: {}", sendable);
+            log.trace("Nodes with data ready to send: {}", ready);
             log.trace("Created {} produce requests: {}", requests.size(), requests);
         }
 
-        for (int i = 0; i < requests.size(); i++) {
-            InFlightRequest request = requests.get(i);
-            this.inFlightRequests.add(request);
-            sends.add(request.request);
-        }
-
-        // do the I/O
-        try {
-            this.selector.poll(100L, sends);
-        } catch (IOException e) {
-            log.error("Unexpected error during I/O in producer network thread", e);
-        }
-
-        // handle responses, connections, and disconnections
-        handleSends(this.selector.completedSends());
-        handleResponses(this.selector.completedReceives(), nowMs);
-        handleDisconnects(this.selector.disconnected(), nowMs);
-        handleConnects(this.selector.connected());
-    }
-
-    /**
-     * Add a metadata request to the list of sends if we need to make one
-     */
-    private void maybeUpdateMetadata(Cluster cluster, List<NetworkSend> sends, long nowMs) {
-        if (this.metadataFetchInProgress || !metadata.needsUpdate(nowMs))
-            return;
-
-        Node node = selectMetadataDestination(cluster);
-        if (node == null)
-            return;
-
-        if (nodeStates.isConnected(node.id())) {
-            Set<String> topics = metadata.topics();
-            this.metadataFetchInProgress = true;
-            InFlightRequest metadataRequest = metadataRequest(nowMs, node.id(), topics);
-            log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
-            sends.add(metadataRequest.request);
-            this.inFlightRequests.add(metadataRequest);
-        } else if (nodeStates.canConnect(node.id(), nowMs)) {
-            // we don't have a connection to this node right now, make one
-            initiateConnect(node, nowMs);
-        }
-    }
-
-    /**
-     * Find a good node to make a metadata request to. This method will first look for a node that has an existing
-     * connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding
-     * requests.
-     * @return A node with no requests currently being sent or null if no such node exists
-     */
-    private Node selectMetadataDestination(Cluster cluster) {
-        List<Node> nodes = cluster.nodes();
-
-        // first look for a node to which we are connected and have no outstanding requests
-        boolean connectionInProgress = false;
-        for (int i = 0; i < nodes.size(); i++) {
-            Node node = nodes.get(metadataNodeIndex(i, nodes.size()));
-            if (nodeStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) {
-                this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size());
-                return node;
-            } else if (nodeStates.isConnecting(node.id())) {
-                connectionInProgress = true;
-            }
-        }
-
-        // if we have a connection that is being established now, just wait for that don't make another
-        if (connectionInProgress)
-            return null;
-
-        // okay, no luck, pick a random unused node
-        for (int i = 0; i < nodes.size(); i++) {
-            Node node = nodes.get(metadataNodeIndex(i, nodes.size()));
-            if (this.inFlightRequests.canSendMore(node.id())) {
-                this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size());
-                return node;
-            }
+        List<ClientResponse> responses = this.client.poll(requests, 100L, now);
+        for (ClientResponse response : responses) {
+            if (response.wasDisconnected())
+                handleDisconnect(response, now);
+            else
+                handleResponse(response, now);
         }
-
-        return null; // we failed to find a good destination
-    }
-
-    /**
-     * Get the index in the node list of the node to use for the metadata request
-     */
-    private int metadataNodeIndex(int offset, int size) {
-        return Utils.abs(offset + this.metadataFetchNodeIndex) % size;
     }
 
     /**
@@ -302,161 +179,40 @@ public class Sender implements Runnable {
         this.wakeup();
     }
 
-    /**
-     * Process the set of destination nodes with data ready to send.
-     *
-     * 1) If we have an unknown leader node, force refresh the metadata.
-     * 2) If we have a connection to the appropriate node, add
-     *    it to the returned set;
-     * 3) If we have not a connection yet, initialize one
-     */
-    private Set<Node> processReadyNode(Set<Node> ready, long nowMs) {
-        Set<Node> sendable = new HashSet<Node>(ready.size());
-        for (Node node : ready) {
-            if (node == null) {
-                // we don't know about this topic/partition or it has no leader, re-fetch metadata
-                metadata.forceUpdate();
-            } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
-                sendable.add(node);
-            } else if (nodeStates.canConnect(node.id(), nowMs)) {
-                // we don't have a connection to this node right now, make one
-                initiateConnect(node, nowMs);
-            }
-        }
-        return sendable;
-    }
-
-    /**
-     * Initiate a connection to the given node
-     */
-    private void initiateConnect(Node node, long nowMs) {
-        try {
-            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
-            selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
-            this.nodeStates.connecting(node.id(), nowMs);
-        } catch (IOException e) {
-            /* attempt failed, we'll try again after the backoff */
-            nodeStates.disconnected(node.id());
-            /* maybe the problem is our metadata, update it */
-            metadata.forceUpdate();
-            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
-        }
-    }
-
-    /**
-     * Handle any closed connections
-     */
-    private void handleDisconnects(List<Integer> disconnects, long nowMs) {
-        // clear out the in-flight requests for the disconnected broker
-        for (int node : disconnects) {
-            nodeStates.disconnected(node);
-            log.debug("Node {} disconnected.", node);
-            for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
-                log.trace("Cancelled request {} due to node {} being disconnected", request, node);
-                ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey());
-                switch (requestKey) {
-                    case PRODUCE:
-                        int correlation = request.request.header().correlationId();
-                        for (RecordBatch batch : request.batches.values())
-                            completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, nowMs);
-                        break;
-                    case METADATA:
-                        metadataFetchInProgress = false;
-                        break;
-                    default:
-                        throw new IllegalArgumentException("Unexpected api key id: " + requestKey.id);
-                }
-            }
-        }
-        // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
-        if (disconnects.size() > 0)
-            this.metadata.forceUpdate();
-    }
-
-    /**
-     * Record any connections that completed in our node state
-     */
-    private void handleConnects(List<Integer> connects) {
-        for (Integer id : connects) {
-            log.debug("Completed connection to node {}", id);
-            this.nodeStates.connected(id);
-        }
-    }
-
-    /**
-     * Process completed sends
-     */
-    public void handleSends(List<NetworkSend> sends) {
-        /* if acks = 0 then the request is satisfied once sent */
-        for (NetworkSend send : sends) {
-            Deque<InFlightRequest> requests = this.inFlightRequests.requestQueue(send.destination());
-            InFlightRequest request = requests.peekFirst();
-            log.trace("Completed send of request to node {}: {}", request.request.destination(), request.request);
-            if (!request.expectResponse) {
-                requests.pollFirst();
-                if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) {
-                    for (RecordBatch batch : request.batches.values()) {
-                        batch.done(-1L, Errors.NONE.exception());
-                        this.accumulator.deallocate(batch);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Handle responses from the server
-     */
-    private void handleResponses(List<NetworkReceive> receives, long nowMs) {
-        for (NetworkReceive receive : receives) {
-            int source = receive.source();
-            InFlightRequest req = inFlightRequests.nextCompleted(source);
-            ResponseHeader header = ResponseHeader.parse(receive.payload());
-            short apiKey = req.request.header().apiKey();
-            Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
-            correlate(req.request.header(), header);
-            if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) {
-                log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId());
-                handleProduceResponse(req, req.request.header(), body, nowMs);
-            } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) {
-                log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header()
-                                                                                                                        .correlationId());
-                handleMetadataResponse(req.request.header(), body, nowMs);
-            } else {
-                throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
-            }
-            this.sensors.recordLatency(receive.source(), nowMs - req.createdMs);
-        }
-
-    }
-
-    private void handleMetadataResponse(RequestHeader header, Struct body, long nowMs) {
-        this.metadataFetchInProgress = false;
-        MetadataResponse response = new MetadataResponse(body);
-        Cluster cluster = response.cluster();
-        // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
-        // created which means we will get errors and no nodes until it exists
-        if (cluster.nodes().size() > 0)
-            this.metadata.update(cluster, nowMs);
-        else
-            log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
+    private void handleDisconnect(ClientResponse response, long now) {
+        log.trace("Cancelled request {} due to node {} being disconnected", response, response.request().request().destination());
+        int correlation = response.request().request().header().correlationId();
+        @SuppressWarnings("unchecked")
+        Map<TopicPartition, RecordBatch> responseBatches = (Map<TopicPartition, RecordBatch>) response.request().attachment();
+        for (RecordBatch batch : responseBatches.values())
+            completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now);
     }
 
     /**
      * Handle a produce response
      */
-    private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long nowMs) {
-        ProduceResponse pr = new ProduceResponse(body);
-        for (Map<TopicPartition, ProduceResponse.PartitionResponse> responses : pr.responses().values()) {
-            for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : responses.entrySet()) {
+    private void handleResponse(ClientResponse response, long now) {
+        int correlationId = response.request().request().header().correlationId();
+        log.trace("Received produce response from node {} with correlation id {}",
+                  response.request().request().destination(),
+                  correlationId);
+        @SuppressWarnings("unchecked")
+        Map<TopicPartition, RecordBatch> batches = (Map<TopicPartition, RecordBatch>) response.request().attachment();
+        // if we have a response, parse it
+        if (response.hasResponse()) {
+            ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
+            for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                 TopicPartition tp = entry.getKey();
-                ProduceResponse.PartitionResponse response = entry.getValue();
-                Errors error = Errors.forCode(response.errorCode);
-                if (error.exception() instanceof InvalidMetadataException)
-                    metadata.forceUpdate();
-                RecordBatch batch = request.batches.get(tp);
-                completeBatch(batch, error, response.baseOffset, header.correlationId(), nowMs);
+                ProduceResponse.PartitionResponse partResp = entry.getValue();
+                Errors error = Errors.forCode(partResp.errorCode);
+                RecordBatch batch = batches.get(tp);
+                completeBatch(batch, error, partResp.baseOffset, correlationId, now);
             }
+            this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
+        } else {
+            // this is the acks = 0 case, just complete all requests
+            for (RecordBatch batch : batches.values())
+                completeBatch(batch, Errors.NONE, -1L, correlationId, now);
         }
     }
 
@@ -466,9 +222,9 @@ public class Sender implements Runnable {
      * @param error The error (or null if none)
      * @param baseOffset The base offset assigned to the records if successful
      * @param correlationId The correlation id for the request
-     * @param nowMs The current POSIX time stamp in milliseconds
+     * @param now The current POSIX time stamp in milliseconds
      */
-    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long nowMs) {
+    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) {
         if (error != Errors.NONE && canRetry(batch, error)) {
             // retry
             log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
@@ -476,7 +232,7 @@ public class Sender implements Runnable {
                      batch.topicPartition,
                      this.retries - batch.attempts - 1,
                      error);
-            this.accumulator.reenqueue(batch, nowMs);
+            this.accumulator.reenqueue(batch, now);
             this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
         } else {
             // tell the user the result of their request
@@ -485,6 +241,8 @@ public class Sender implements Runnable {
             if (error != Errors.NONE)
                 this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
         }
+        if (error.exception() instanceof InvalidMetadataException)
+            metadata.forceUpdate();
     }
 
     /**
@@ -495,257 +253,35 @@ public class Sender implements Runnable {
     }
 
     /**
-     * Validate that the response corresponds to the request we expect or else explode
-     */
-    private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
-        if (requestHeader.correlationId() != responseHeader.correlationId())
-            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() +
-                                            ") does not match request (" +
-                                            requestHeader.correlationId() +
-                                            ")");
-    }
-
-    /**
-     * Create a metadata request for the given topics
-     */
-    private InFlightRequest metadataRequest(long nowMs, int node, Set<String> topics) {
-        MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
-        RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct());
-        return new InFlightRequest(nowMs, true, send, null);
-    }
-
-    /**
      * Transfer the record batches into a list of produce requests on a per-node basis
      */
-    private List<InFlightRequest> generateProduceRequests(Map<Integer, List<RecordBatch>> collated, long nowMs) {
-        List<InFlightRequest> requests = new ArrayList<InFlightRequest>(collated.size());
+    private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
+        List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
         for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
-            requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue()));
+            requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
         return requests;
     }
 
     /**
      * Create a produce request from the given record batches
      */
-    private InFlightRequest produceRequest(long nowMs, int destination, short acks, int timeout, List<RecordBatch> batches) {
-        Map<TopicPartition, RecordBatch> batchesByPartition = new HashMap<TopicPartition, RecordBatch>();
-        Map<String, List<RecordBatch>> batchesByTopic = new HashMap<String, List<RecordBatch>>();
+    private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
+        ProduceRequest request = new ProduceRequest(acks, timeout);
+        Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
         for (RecordBatch batch : batches) {
-            batchesByPartition.put(batch.topicPartition, batch);
-            List<RecordBatch> found = batchesByTopic.get(batch.topicPartition.topic());
-            if (found == null) {
-                found = new ArrayList<RecordBatch>();
-                batchesByTopic.put(batch.topicPartition.topic(), found);
-            }
-            found.add(batch);
-        }
-        Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id));
-        produce.set("acks", acks);
-        produce.set("timeout", timeout);
-        List<Struct> topicDatas = new ArrayList<Struct>(batchesByTopic.size());
-        for (Map.Entry<String, List<RecordBatch>> entry : batchesByTopic.entrySet()) {
-            Struct topicData = produce.instance("topic_data");
-            topicData.set("topic", entry.getKey());
-            List<RecordBatch> parts = entry.getValue();
-            Object[] partitionData = new Object[parts.size()];
-            for (int i = 0; i < parts.size(); i++) {
-                ByteBuffer buffer = parts.get(i).records.buffer();
-                buffer.flip();
-                Struct part = topicData.instance("data")
-                                       .set("partition", parts.get(i).topicPartition.partition())
-                                       .set("record_set", buffer);
-                partitionData[i] = part;
-            }
-            topicData.set("data", partitionData);
-            topicDatas.add(topicData);
+            batch.records.buffer().flip();
+            request.add(batch.topicPartition, batch.records);
+            recordsByPartition.put(batch.topicPartition, batch);
         }
-        produce.set("topic_data", topicDatas.toArray());
-
-        RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce);
-        return new InFlightRequest(nowMs, acks != 0, send, batchesByPartition);
-    }
-
-    private RequestHeader header(ApiKeys key) {
-        return new RequestHeader(key.id, clientId, correlation++);
+        RequestSend send = new RequestSend(destination, this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct());
+        return new ClientRequest(now, acks != 0, send, recordsByPartition);
     }
 
     /**
      * Wake up the selector associated with this send thread
      */
     public void wakeup() {
-        this.selector.wakeup();
-    }
-
-    /**
-     * The states of a node connection
-     */
-    private static enum ConnectionState {
-        DISCONNECTED, CONNECTING, CONNECTED
-    }
-
-    /**
-     * The state of a node
-     */
-    private static final class NodeState {
-        private ConnectionState state;
-        private long lastConnectAttemptMs;
-
-        public NodeState(ConnectionState state, long lastConnectAttempt) {
-            this.state = state;
-            this.lastConnectAttemptMs = lastConnectAttempt;
-        }
-
-        public String toString() {
-            return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
-        }
-    }
-
-    private static class NodeStates {
-        private final long reconnectBackoffMs;
-        private final Map<Integer, NodeState> nodeState;
-
-        public NodeStates(long reconnectBackoffMs) {
-            this.reconnectBackoffMs = reconnectBackoffMs;
-            this.nodeState = new HashMap<Integer, NodeState>();
-        }
-
-        public boolean canConnect(int node, long nowMs) {
-            NodeState state = nodeState.get(node);
-            if (state == null)
-                return true;
-            else
-                return state.state == ConnectionState.DISCONNECTED && nowMs - state.lastConnectAttemptMs > this.reconnectBackoffMs;
-        }
-
-        public void connecting(int node, long nowMs) {
-            nodeState.put(node, new NodeState(ConnectionState.CONNECTING, nowMs));
-        }
-
-        public boolean isConnected(int node) {
-            NodeState state = nodeState.get(node);
-            return state != null && state.state == ConnectionState.CONNECTED;
-        }
-
-        public boolean isConnecting(int node) {
-            NodeState state = nodeState.get(node);
-            return state != null && state.state == ConnectionState.CONNECTING;
-        }
-
-        public void connected(int node) {
-            nodeState(node).state = ConnectionState.CONNECTED;
-        }
-
-        public void disconnected(int node) {
-            nodeState(node).state = ConnectionState.DISCONNECTED;
-        }
-
-        private NodeState nodeState(int node) {
-            NodeState state = this.nodeState.get(node);
-            if (state == null)
-                throw new IllegalStateException("No entry found for node " + node);
-            return state;
-        }
-    }
-
-    /**
-     * An request that hasn't been fully processed yet
-     */
-    private static final class InFlightRequest {
-        public long createdMs;
-        public boolean expectResponse;
-        public Map<TopicPartition, RecordBatch> batches;
-        public RequestSend request;
-
-        /**
-         * @param createdMs The unix timestamp in milliseonds for the time at which this request was created.
-         * @param expectResponse Should we expect a response message or is this request complete once it is sent?
-         * @param request The request
-         * @param batches The record batches contained in the request if it is a produce request
-         */
-        public InFlightRequest(long createdMs, boolean expectResponse, RequestSend request, Map<TopicPartition, RecordBatch> batches) {
-            this.createdMs = createdMs;
-            this.batches = batches;
-            this.request = request;
-            this.expectResponse = expectResponse;
-        }
-
-        @Override
-        public String toString() {
-            return "InFlightRequest(expectResponse=" + expectResponse + ", batches=" + batches + ", request=" + request + ")";
-        }
-    }
-
-    /**
-     * A set of outstanding request queues for each node that have not yet received responses
-     */
-    private static final class InFlightRequests {
-        private final int maxInFlightRequestsPerConnection;
-        private final Map<Integer, Deque<InFlightRequest>> requests;
-
-        public InFlightRequests(int maxInFlightRequestsPerConnection) {
-            this.requests = new HashMap<Integer, Deque<InFlightRequest>>();
-            this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
-        }
-
-        /**
-         * Add the given request to the queue for the node it was directed to
-         */
-        public void add(InFlightRequest request) {
-            Deque<InFlightRequest> reqs = this.requests.get(request.request.destination());
-            if (reqs == null) {
-                reqs = new ArrayDeque<InFlightRequest>();
-                this.requests.put(request.request.destination(), reqs);
-            }
-            reqs.addFirst(request);
-        }
-
-        public Deque<InFlightRequest> requestQueue(int node) {
-            Deque<InFlightRequest> reqs = requests.get(node);
-            if (reqs == null || reqs.isEmpty())
-                throw new IllegalStateException("Response from server for which there are no in-flight requests.");
-            return reqs;
-        }
-
-        /**
-         * Get the oldest request (the one that that will be completed next) for the given node
-         */
-        public InFlightRequest nextCompleted(int node) {
-            return requestQueue(node).pollLast();
-        }
-
-        /**
-         * Can we send more requests to this node?
-         *
-         * @param node Node in question
-         * @return true iff we have no requests still being sent to the given node
-         */
-        public boolean canSendMore(int node) {
-            Deque<InFlightRequest> queue = requests.get(node);
-            return queue == null || queue.isEmpty() ||
-                   (queue.peekFirst().request.complete() && queue.size() < this.maxInFlightRequestsPerConnection);
-        }
-
-        /**
-         * Clear out all the in-flight requests for the given node and return them
-         *
-         * @param node The node
-         * @return All the in-flight requests for that node that have been removed
-         */
-        public Iterable<InFlightRequest> clearAll(int node) {
-            Deque<InFlightRequest> reqs = requests.get(node);
-            if (reqs == null) {
-                return Collections.emptyList();
-            } else {
-                return requests.remove(node);
-            }
-        }
-
-        public int totalInFlightRequests() {
-            int total = 0;
-            for (Deque<InFlightRequest> deque : this.requests.values())
-                total += deque.size();
-            return total;
-        }
+        this.client.wakeup();
     }
 
     /**
@@ -770,9 +306,7 @@ public class Sender implements Runnable {
             this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
 
             this.compressionRateSensor = metrics.sensor("compression-rate");
-            this.compressionRateSensor.add("compression-rate-avg",
-                                     "The average compression rate of record batches.",
-                                     new Avg());
+            this.compressionRateSensor.add("compression-rate-avg", "The average compression rate of record batches.", new Avg());
 
             this.queueTimeSensor = metrics.sensor("queue-time");
             this.queueTimeSensor.add("record-queue-time-avg",
@@ -800,13 +334,13 @@ public class Sender implements Runnable {
             this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max());
 
             this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() {
-                public double measure(MetricConfig config, long nowMs) {
-                    return inFlightRequests.totalInFlightRequests();
+                public double measure(MetricConfig config, long now) {
+                    return client.inFlightRequestCount();
                 }
             });
             metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() {
-                public double measure(MetricConfig config, long nowMs) {
-                    return (nowMs - metadata.lastUpdate()) / 1000.0;
+                public double measure(MetricConfig config, long now) {
+                    return (now - metadata.lastUpdate()) / 1000.0;
                 }
             });
         }
@@ -838,14 +372,15 @@ public class Sender implements Runnable {
             }
         }
 
-        public void updateProduceRequestMetrics(List<InFlightRequest> requests) {
-            long nowMs = time.milliseconds();
+        public void updateProduceRequestMetrics(List<ClientRequest> requests) {
+            long now = time.milliseconds();
             for (int i = 0; i < requests.size(); i++) {
-                InFlightRequest request = requests.get(i);
+                ClientRequest request = requests.get(i);
                 int records = 0;
 
-                if (request.batches != null) {
-                    for (RecordBatch batch : request.batches.values()) {
+                if (request.attachment() != null) {
+                    Map<TopicPartition, RecordBatch> responseBatches = (Map<TopicPartition, RecordBatch>) request.attachment();
+                    for (RecordBatch batch : responseBatches.values()) {
 
                         // register all per-topic metrics at once
                         String topic = batch.topicPartition.topic();
@@ -867,43 +402,43 @@ public class Sender implements Runnable {
                         topicCompressionRate.record(batch.records.compressionRate());
 
                         // global metrics
-                        this.batchSizeSensor.record(batch.records.sizeInBytes(), nowMs);
-                        this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, nowMs);
+                        this.batchSizeSensor.record(batch.records.sizeInBytes(), now);
+                        this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now);
                         this.compressionRateSensor.record(batch.records.compressionRate());
-                        this.maxRecordSizeSensor.record(batch.maxRecordSize, nowMs);
+                        this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
                         records += batch.recordCount;
                     }
-                    this.recordsPerRequestSensor.record(records, nowMs);
+                    this.recordsPerRequestSensor.record(records, now);
                 }
             }
         }
 
         public void recordRetries(String topic, int count) {
-            long nowMs = time.milliseconds();
-            this.retrySensor.record(count, nowMs);
+            long now = time.milliseconds();
+            this.retrySensor.record(count, now);
             String topicRetryName = "topic." + topic + ".record-retries";
             Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName);
             if (topicRetrySensor != null)
-              topicRetrySensor.record(count, nowMs);
+                topicRetrySensor.record(count, now);
         }
 
         public void recordErrors(String topic, int count) {
-            long nowMs = time.milliseconds();
-            this.errorSensor.record(count, nowMs);
+            long now = time.milliseconds();
+            this.errorSensor.record(count, now);
             String topicErrorName = "topic." + topic + ".record-errors";
             Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
             if (topicErrorSensor != null)
-              topicErrorSensor.record(count, nowMs);
+                topicErrorSensor.record(count, now);
         }
 
         public void recordLatency(int node, long latency) {
-            long nowMs = time.milliseconds();
-            this.requestTimeSensor.record(latency, nowMs);
+            long now = time.milliseconds();
+            this.requestTimeSensor.record(latency, now);
             if (node >= 0) {
                 String nodeTimeName = "node-" + node + ".latency";
                 Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
                 if (nodeRequestTime != null)
-                  nodeRequestTime.record(latency, nowMs);
+                    nodeRequestTime.record(latency, now);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
index 7c2e33c..79f61bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics;
 
@@ -24,9 +20,9 @@ public interface Measurable {
     /**
      * Measure this quantity and return the result as a double
      * @param config The configuration for this metric
-     * @param nowMs The POSIX time in milliseconds the measurement is being taken
+     * @param now The POSIX time in milliseconds the measurement is being taken
      * @return The measured value
      */
-    public double measure(MetricConfig config, long nowMs);
+    public double measure(MetricConfig config, long now);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
index c9963cb..ed6767f 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics.stats;
 
@@ -20,7 +16,6 @@ import java.util.List;
 
 import org.apache.kafka.common.metrics.MetricConfig;
 
-
 /**
  * A {@link SampledStat} that maintains a simple average over its samples.
  */
@@ -31,12 +26,12 @@ public class Avg extends SampledStat {
     }
 
     @Override
-    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
+    protected void update(Sample sample, MetricConfig config, double value, long now) {
         sample.value += value;
     }
 
     @Override
-    public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
         double total = 0.0;
         long count = 0;
         for (int i = 0; i < samples.size(); i++) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
index efcd61b..90c0bf5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics.stats;
 
@@ -20,7 +16,6 @@ import java.util.List;
 
 import org.apache.kafka.common.metrics.MetricConfig;
 
-
 /**
  * A {@link SampledStat} that maintains a simple count of what it has seen.
  */
@@ -31,12 +26,12 @@ public class Count extends SampledStat {
     }
 
     @Override
-    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
+    protected void update(Sample sample, MetricConfig config, double value, long now) {
         sample.value += 1.0;
     }
 
     @Override
-    public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
         double total = 0.0;
         for (int i = 0; i < samples.size(); i++)
             total += samples.get(i).value;

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
index c492c38..6bbb0a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics.stats;
 
@@ -20,7 +16,6 @@ import java.util.List;
 
 import org.apache.kafka.common.metrics.MetricConfig;
 
-
 /**
  * A {@link SampledStat} that gives the max over its samples.
  */
@@ -31,12 +26,12 @@ public final class Max extends SampledStat {
     }
 
     @Override
-    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
+    protected void update(Sample sample, MetricConfig config, double value, long now) {
         sample.value = Math.max(sample.value, value);
     }
 
     @Override
-    public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
         double max = Double.NEGATIVE_INFINITY;
         for (int i = 0; i < samples.size(); i++)
             max = Math.max(max, samples.get(i).value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
index bd0919c..9f74417 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics.stats;
 
@@ -20,7 +16,6 @@ import java.util.List;
 
 import org.apache.kafka.common.metrics.MetricConfig;
 
-
 /**
  * A {@link SampledStat} that gives the min over its samples.
  */
@@ -31,12 +26,12 @@ public class Min extends SampledStat {
     }
 
     @Override
-    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
+    protected void update(Sample sample, MetricConfig config, double value, long now) {
         sample.value = Math.min(sample.value, value);
     }
 
     @Override
-    public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
         double max = Double.MAX_VALUE;
         for (int i = 0; i < samples.size(); i++)
             max = Math.min(max, samples.get(i).value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
index 8300978..c70d577 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics.stats;
 
@@ -26,7 +22,6 @@ import org.apache.kafka.common.metrics.stats.Histogram.BinScheme;
 import org.apache.kafka.common.metrics.stats.Histogram.ConstantBinScheme;
 import org.apache.kafka.common.metrics.stats.Histogram.LinearBinScheme;
 
-
 /**
  * A compound stat that reports one or more percentiles
  */
@@ -65,16 +60,16 @@ public class Percentiles extends SampledStat implements CompoundStat {
         for (Percentile percentile : this.percentiles) {
             final double pct = percentile.percentile();
             ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() {
-                public double measure(MetricConfig config, long nowMs) {
-                    return value(config, nowMs, pct / 100.0);
+                public double measure(MetricConfig config, long now) {
+                    return value(config, now, pct / 100.0);
                 }
             }));
         }
         return ms;
     }
 
-    public double value(MetricConfig config, long nowMs, double quantile) {
-        purgeObsoleteSamples(config, nowMs);
+    public double value(MetricConfig config, long now, double quantile) {
+        purgeObsoleteSamples(config, now);
         float count = 0.0f;
         for (Sample sample : this.samples)
             count += sample.eventCount;
@@ -94,8 +89,8 @@ public class Percentiles extends SampledStat implements CompoundStat {
         return Double.POSITIVE_INFINITY;
     }
 
-    public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
-        return value(config, nowMs, 0.5);
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
+        return value(config, now, 0.5);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index 4b481a5..a5838b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -56,9 +56,9 @@ public class Rate implements MeasurableStat {
     }
 
     @Override
-    public double measure(MetricConfig config, long nowMs) {
-        double value = stat.measure(config, nowMs);
-        double elapsed = convert(nowMs - stat.oldest(nowMs).lastWindowMs);
+    public double measure(MetricConfig config, long now) {
+        double value = stat.measure(config, now);
+        double elapsed = convert(now - stat.oldest(now).lastWindowMs);
         return value / elapsed;
     }
 
@@ -95,7 +95,7 @@ public class Rate implements MeasurableStat {
         }
 
         @Override
-        public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
+        public double combine(List<Sample> samples, MetricConfig config, long now) {
             double total = 0.0;
             for (int i = 0; i < samples.size(); i++)
                 total += samples.get(i).value;

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
index 0d4056f..b341b7d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
@@ -66,9 +66,9 @@ public abstract class SampledStat implements MeasurableStat {
     }
 
     @Override
-    public double measure(MetricConfig config, long nowMs) {
-        purgeObsoleteSamples(config, nowMs);
-        return combine(this.samples, config, nowMs);
+    public double measure(MetricConfig config, long now) {
+        purgeObsoleteSamples(config, now);
+        return combine(this.samples, config, now);
     }
 
     public Sample current(long timeMs) {
@@ -77,9 +77,9 @@ public abstract class SampledStat implements MeasurableStat {
         return this.samples.get(this.current);
     }
 
-    public Sample oldest(long nowMs) {
+    public Sample oldest(long now) {
         if (samples.size() == 0)
-            this.samples.add(newSample(nowMs));
+            this.samples.add(newSample(now));
         Sample oldest = this.samples.get(0);
         for (int i = 1; i < this.samples.size(); i++) {
             Sample curr = this.samples.get(i);
@@ -91,15 +91,15 @@ public abstract class SampledStat implements MeasurableStat {
 
     protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs);
 
-    public abstract double combine(List<Sample> samples, MetricConfig config, long nowMs);
+    public abstract double combine(List<Sample> samples, MetricConfig config, long now);
 
     /* Timeout any windows that have expired in the absence of any events */
-    protected void purgeObsoleteSamples(MetricConfig config, long nowMs) {
+    protected void purgeObsoleteSamples(MetricConfig config, long now) {
         long expireAge = config.samples() * config.timeWindowMs();
         for (int i = 0; i < samples.size(); i++) {
             Sample sample = this.samples.get(i);
-            if (nowMs - sample.lastWindowMs >= expireAge)
-                sample.reset(nowMs);
+            if (now - sample.lastWindowMs >= expireAge)
+                sample.reset(now);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
index 53dd3d5..67999a9 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics.stats;
 
@@ -35,12 +31,12 @@ public class Total implements MeasurableStat {
     }
 
     @Override
-    public void record(MetricConfig config, double value, long timeMs) {
+    public void record(MetricConfig config, double value, long now) {
         this.total += value;
     }
 
     @Override
-    public double measure(MetricConfig config, long nowMs) {
+    public double measure(MetricConfig config, long now) {
         return this.total;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
index 6350424..c8213e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
@@ -42,7 +42,7 @@ public class ByteBufferSend implements Send {
     }
 
     @Override
-    public boolean complete() {
+    public boolean completed() {
         return remaining <= 0;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 3e35898..93f2f1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -414,8 +414,8 @@ public class Selector implements Selectable {
 
             this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
             bytesTransferred.add("network-io-rate",
-                "The average number of network operations (reads or writes) on all connections per second.",
-                new Rate(new Count()));
+                                 "The average number of network operations (reads or writes) on all connections per second.",
+                                 new Rate(new Count()));
 
             this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
             this.bytesSent.add("outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers.", new Rate());
@@ -429,11 +429,11 @@ public class Selector implements Selectable {
 
             this.selectTime = this.metrics.sensor("select-time");
             this.selectTime.add("select-rate",
-                "Number of times the I/O layer checked for new I/O to perform per second",
-                new Rate(new Count()));
+                                "Number of times the I/O layer checked for new I/O to perform per second",
+                                new Rate(new Count()));
             this.selectTime.add("io-wait-time-ns-avg",
-                "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.",
-                new Avg());
+                                "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.",
+                                new Avg());
             this.selectTime.add("io-wait-ratio", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS));
 
             this.ioTime = this.metrics.sensor("io-time");
@@ -441,7 +441,7 @@ public class Selector implements Selectable {
             this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS));
 
             this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() {
-                public double measure(MetricConfig config, long nowMs) {
+                public double measure(MetricConfig config, long now) {
                     return keys.size();
                 }
             });
@@ -456,7 +456,9 @@ public class Selector implements Selectable {
                 if (nodeRequest == null) {
                     nodeRequest = this.metrics.sensor(nodeRequestName);
                     nodeRequest.add("node-" + node + ".outgoing-byte-rate", new Rate());
-                    nodeRequest.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count()));
+                    nodeRequest.add("node-" + node + ".request-rate",
+                                    "The average number of requests sent per second.",
+                                    new Rate(new Count()));
                     nodeRequest.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
                     nodeRequest.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
 
@@ -464,8 +466,8 @@ public class Selector implements Selectable {
                     Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
                     nodeResponse.add("node-" + node + ".incoming-byte-rate", new Rate());
                     nodeResponse.add("node-" + node + ".response-rate",
-                        "The average number of responses received per second.",
-                        new Rate(new Count()));
+                                     "The average number of responses received per second.",
+                                     new Rate(new Count()));
 
                     String nodeTimeName = "node-" + node + ".latency";
                     Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
@@ -476,22 +478,24 @@ public class Selector implements Selectable {
         }
 
         public void recordBytesSent(int node, int bytes) {
-            long nowMs = time.milliseconds();
-            this.bytesSent.record(bytes, nowMs);
+            long now = time.milliseconds();
+            this.bytesSent.record(bytes, now);
             if (node >= 0) {
                 String nodeRequestName = "node-" + node + ".bytes-sent";
                 Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest != null) nodeRequest.record(bytes, nowMs);
+                if (nodeRequest != null)
+                    nodeRequest.record(bytes, now);
             }
         }
 
         public void recordBytesReceived(int node, int bytes) {
-            long nowMs = time.milliseconds();
-            this.bytesReceived.record(bytes, nowMs);
+            long now = time.milliseconds();
+            this.bytesReceived.record(bytes, now);
             if (node >= 0) {
                 String nodeRequestName = "node-" + node + ".bytes-received";
                 Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest != null) nodeRequest.record(bytes, nowMs);
+                if (nodeRequest != null)
+                    nodeRequest.record(bytes, now);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java
index d62dff9..5d321a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Send.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.network;
 
@@ -38,7 +34,7 @@ public interface Send {
     /**
      * Is this send complete?
      */
-    public boolean complete();
+    public boolean completed();
 
     /**
      * An optional method to turn this send into an array of ByteBuffers if possible (otherwise returns null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index 68b8827..7164701 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.protocol.types;
 
@@ -51,8 +47,9 @@ public class Schema extends Type {
                 Object value = f.type().validate(r.get(f));
                 f.type.write(buffer, value);
             } catch (Exception e) {
-                throw new SchemaException("Error writing field '" + f.name + "': " + e.getMessage() == null ? e.getMessage() : e.getClass()
-                                                                                                                                .getName());
+                throw new SchemaException("Error writing field '" + f.name +
+                                          "': " +
+                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
             }
         }
     }
@@ -66,8 +63,9 @@ public class Schema extends Type {
             try {
                 objects[i] = fields[i].type.read(buffer);
             } catch (Exception e) {
-                throw new SchemaException("Error reading field '" + fields[i].name + "': " + e.getMessage() == null ? e.getMessage()
-                                                                                                                   : e.getClass().getName());
+                throw new SchemaException("Error reading field '" + fields[i].name +
+                                          "': " +
+                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
             }
         }
         return new Struct(this, objects);

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 15c9577..759f577 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.record;
 
@@ -88,14 +84,14 @@ public class MemoryRecords implements Records {
 
     /**
      * Check if we have room for a new record containing the given key/value pair
-     *
-     * Note that the return value is based on the estimate of the bytes written to the compressor,
-     * which may not be accurate if compression is really used. When this happens, the following
-     * append may cause dynamic buffer re-allocation in the underlying byte buffer stream.
+     * 
+     * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
+     * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
+     * re-allocation in the underlying byte buffer stream.
      */
     public boolean hasRoomFor(byte[] key, byte[] value) {
-        return this.writable &&
-            this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
+        return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD +
+                                                 Record.recordSize(key, value);
     }
 
     public boolean isFull() {
@@ -169,10 +165,10 @@ public class MemoryRecords implements Records {
 
         /*
          * Read the next record from the buffer.
-         *
-         * Note that in the compressed message set, each message value size is set as the size
-         * of the un-compressed version of the message value, so when we do de-compression
-         * allocating an array of the specified size for reading compressed value data is sufficient.
+         * 
+         * Note that in the compressed message set, each message value size is set as the size of the un-compressed
+         * version of the message value, so when we do de-compression allocating an array of the specified size for
+         * reading compressed value data is sufficient.
          */
         @Override
         protected LogEntry makeNext() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
new file mode 100644
index 0000000..6036f6a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -0,0 +1,71 @@
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
+
+public class ProduceRequest {
+
+    private final short acks;
+    private final int timeout;
+    private final Map<String, List<PartitionRecords>> records;
+
+    public ProduceRequest(short acks, int timeout) {
+        this.acks = acks;
+        this.timeout = timeout;
+        this.records = new HashMap<String, List<PartitionRecords>>();
+    }
+
+    public void add(TopicPartition tp, MemoryRecords recs) {
+        List<PartitionRecords> found = this.records.get(tp.topic());
+        if (found == null) {
+            found = new ArrayList<PartitionRecords>();
+            records.put(tp.topic(), found);
+        }
+        found.add(new PartitionRecords(tp, recs));
+    }
+
+    public Struct toStruct() {
+        Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id));
+        produce.set("acks", acks);
+        produce.set("timeout", timeout);
+        List<Struct> topicDatas = new ArrayList<Struct>(records.size());
+        for (Map.Entry<String, List<PartitionRecords>> entry : records.entrySet()) {
+            Struct topicData = produce.instance("topic_data");
+            topicData.set("topic", entry.getKey());
+            List<PartitionRecords> parts = entry.getValue();
+            Object[] partitionData = new Object[parts.size()];
+            for (int i = 0; i < parts.size(); i++) {
+                ByteBuffer buffer = parts.get(i).records.buffer();
+                buffer.flip();
+                Struct part = topicData.instance("data")
+                                       .set("partition", parts.get(i).topicPartition.partition())
+                                       .set("record_set", buffer);
+                partitionData[i] = part;
+            }
+            topicData.set("data", partitionData);
+            topicDatas.add(topicData);
+        }
+        produce.set("topic_data", topicDatas.toArray());
+        return produce;
+    }
+
+    private static final class PartitionRecords {
+        public final TopicPartition topicPartition;
+        public final MemoryRecords records;
+
+        public PartitionRecords(TopicPartition topicPartition, MemoryRecords records) {
+            this.topicPartition = topicPartition;
+            this.records = records;
+        }
+    }
+
+}


Mime
View raw message