kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/3] kafka git commit: KAFKA-2063; Bound fetch response size (KIP-74)
Date Sun, 18 Sep 2016 16:16:02 GMT
KAFKA-2063; Bound fetch response size (KIP-74)

This PR is implementation of [KIP-74](https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes) which is originally motivated by [KAFKA-2063](https://issues.apache.org/jira/browse/KAFKA-2063).

Author: Andrey Neporada <neporada@gmail.com>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>, Jiangjie Qin <becket.qin@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #1812 from nepal/kip-74


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d04b0998
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d04b0998
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d04b0998

Branch: refs/heads/trunk
Commit: d04b0998c043a6a430921585ffd4c42572a3bf5a
Parents: 6fb25f0
Author: Andrey Neporada <neporada@gmail.com>
Authored: Sun Sep 18 09:12:53 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Sun Sep 18 09:15:16 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  23 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |   1 +
 .../clients/consumer/internals/Fetcher.java     |  42 ++--
 .../consumer/internals/SubscriptionState.java   |  82 ++++---
 .../kafka/common/internals/PartitionStates.java | 174 ++++++++++++++
 .../apache/kafka/common/protocol/Protocol.java  |  25 +-
 .../kafka/common/requests/FetchRequest.java     |  96 ++++++--
 .../kafka/common/requests/FetchResponse.java    |  91 ++++----
 .../clients/consumer/KafkaConsumerTest.java     |   2 +
 .../clients/consumer/internals/FetcherTest.java |  23 +-
 .../common/internals/PartitionStatesTest.java   | 219 +++++++++++++++++
 .../common/requests/RequestResponseTest.java    |  26 ++-
 .../main/scala/kafka/admin/ConfigCommand.scala  |  18 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  48 ----
 core/src/main/scala/kafka/api/ApiVersion.scala  |  11 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |  86 +++++--
 .../main/scala/kafka/api/FetchResponse.scala    |  41 ++--
 .../scala/kafka/consumer/ConsumerConfig.scala   |   6 +-
 .../kafka/consumer/ConsumerFetcherManager.scala |  24 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |  33 +--
 .../coordinator/GroupMetadataManager.scala      |   2 +-
 .../main/scala/kafka/javaapi/FetchRequest.scala |  48 ++--
 .../main/scala/kafka/log/FileMessageSet.scala   |  12 +-
 core/src/main/scala/kafka/log/Log.scala         |   5 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  46 ++--
 .../kafka/server/AbstractFetcherManager.scala   |  13 +-
 .../kafka/server/AbstractFetcherThread.scala    | 118 ++++++----
 .../main/scala/kafka/server/DelayedFetch.scala  |  22 +-
 .../main/scala/kafka/server/FetchDataInfo.scala |   3 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  31 +--
 .../main/scala/kafka/server/KafkaConfig.scala   |  16 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  93 ++++----
 .../scala/kafka/server/ReplicaManager.scala     | 199 +++++++++-------
 .../kafka/api/AuthorizerIntegrationTest.scala   |   5 +-
 .../kafka/api/BaseConsumerTest.scala            |  30 +--
 .../kafka/api/PlaintextConsumerTest.scala       | 133 +++++++++--
 .../kafka/api/ProducerFailureHandlingTest.scala |  94 ++++----
 .../api/RequestResponseSerializationTest.scala  |   8 +-
 .../kafka/integration/PrimitiveApiTest.scala    |   6 +-
 .../unit/kafka/log/FileMessageSetTest.scala     |  38 +--
 .../scala/unit/kafka/log/LogSegmentTest.scala   |   2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  58 +++++
 .../server/AbstractFetcherThreadTest.scala      |  26 +--
 .../unit/kafka/server/FetchRequestTest.scala    | 233 +++++++++++++++++++
 .../unit/kafka/server/KafkaConfigTest.scala     |   1 +
 .../kafka/server/ReplicaManagerQuotasTest.scala |  30 +--
 .../unit/kafka/server/ReplicaManagerTest.scala  |  26 ++-
 .../unit/kafka/server/SimpleFetchTest.scala     |  25 +-
 48 files changed, 1716 insertions(+), 678 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 4ce908e..ae791b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -113,6 +113,17 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.";
 
     /**
+     * <code>fetch.max.bytes</code>
+     */
+    public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
+    private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of data the server should return for a fetch request. " +
+            "This is not an absolute maximum, if the first message in the first non-empty partition of the fetch is larger than " +
+            "this value, the message will still be returned to ensure that the consumer can make progress. " +
+            "The maximum message size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
+            "<code>max.message.bytes</code> (topic config). Note that the consumer performs multiple fetches in parallel.";
+    public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
+
+    /**
      * <code>fetch.max.wait.ms</code>
      */
     public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
@@ -125,7 +136,11 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>max.partition.fetch.bytes</code>
      */
     public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
-    private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.";
+    private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server " +
+            "will return. If the first message in the first non-empty partition of the fetch is larger than this limit, the " +
+            "message will still be returned to ensure that the consumer can make progress. The maximum message size " +
+            "accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
+            "<code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size";
     public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024;
 
     /** <code>send.buffer.bytes</code> */
@@ -265,6 +280,12 @@ public class ConsumerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.HIGH,
                                         FETCH_MIN_BYTES_DOC)
+                                .define(FETCH_MAX_BYTES_CONFIG,
+                                        Type.INT,
+                                        DEFAULT_FETCH_MAX_BYTES,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        FETCH_MAX_BYTES_DOC)
                                 .define(FETCH_MAX_WAIT_MS_CONFIG,
                                         Type.INT,
                                         500,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 8fa7ab7..f7f2d20 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -672,6 +672,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
             this.fetcher = new Fetcher<>(this.client,
                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
+                    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                     config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dd9d084..202c0ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -62,6 +61,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -79,6 +79,7 @@ public class Fetcher<K, V> {
     private final ConsumerNetworkClient client;
     private final Time time;
     private final int minBytes;
+    private final int maxBytes;
     private final int maxWaitMs;
     private final int fetchSize;
     private final long retryBackoffMs;
@@ -96,6 +97,7 @@ public class Fetcher<K, V> {
 
     public Fetcher(ConsumerNetworkClient client,
                    int minBytes,
+                   int maxBytes,
                    int maxWaitMs,
                    int fetchSize,
                    int maxPollRecords,
@@ -113,6 +115,7 @@ public class Fetcher<K, V> {
         this.metadata = metadata;
         this.subscriptions = subscriptions;
         this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
         this.maxWaitMs = maxWaitMs;
         this.fetchSize = fetchSize;
         this.maxPollRecords = maxPollRecords;
@@ -152,7 +155,7 @@ public class Fetcher<K, V> {
      * an in-flight fetch or pending fetch data.
      */
     public void sendFetches() {
-        for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
+        for (Map.Entry<Node, FetchRequest> fetchEntry : createFetchRequests().entrySet()) {
             final FetchRequest request = fetchEntry.getValue();
             final Node fetchTarget = fetchEntry.getKey();
 
@@ -514,8 +517,8 @@ public class Fetcher<K, V> {
         }
     }
 
-    private Set<TopicPartition> fetchablePartitions() {
-        Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
+    private List<TopicPartition> fetchablePartitions() {
+        List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
         if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
             fetchable.remove(nextInLineRecords.partition);
         for (CompletedFetch completedFetch : completedFetches)
@@ -530,16 +533,16 @@ public class Fetcher<K, V> {
     private Map<Node, FetchRequest> createFetchRequests() {
         // create the fetch info
         Cluster cluster = metadata.fetch();
-        Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
+        Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>();
         for (TopicPartition partition : fetchablePartitions()) {
             Node node = cluster.leaderFor(partition);
             if (node == null) {
                 metadata.requestUpdate();
             } else if (this.client.pendingRequestCount(node) == 0) {
                 // if there is a leader and no in-flight requests, issue a new fetch
-                Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
+                LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                 if (fetch == null) {
-                    fetch = new HashMap<>();
+                    fetch = new LinkedHashMap<>();
                     fetchable.put(node, fetch);
                 }
 
@@ -553,9 +556,9 @@ public class Fetcher<K, V> {
 
         // create the fetches
         Map<Node, FetchRequest> requests = new HashMap<>();
-        for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
+        for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
             Node node = entry.getKey();
-            FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
+            FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue());
             requests.put(node, fetch);
         }
         return requests;
@@ -590,14 +593,11 @@ public class Fetcher<K, V> {
                 ByteBuffer buffer = partition.recordSet;
                 MemoryRecords records = MemoryRecords.readableRecords(buffer);
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
-                boolean skippedRecords = false;
                 for (LogEntry logEntry : records) {
                     // Skip the messages earlier than current position.
                     if (logEntry.offset() >= position) {
                         parsed.add(parseRecord(tp, logEntry));
                         bytes += logEntry.size();
-                    } else {
-                        skippedRecords = true;
                     }
                 }
 
@@ -609,19 +609,6 @@ public class Fetcher<K, V> {
                     parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
                     ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
                     this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
-                } else if (buffer.limit() > 0 && !skippedRecords) {
-                    // we did not read a single message from a non-empty buffer
-                    // because that message's size is larger than fetch size, in this case
-                    // record this exception
-                    Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
-                    throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
-                            + recordTooLargePartitions
-                            + " whose size is larger than the fetch size "
-                            + this.fetchSize
-                            + " and hence cannot be ever returned."
-                            + " Increase the fetch size on the client (using max.partition.fetch.bytes),"
-                            + " or decrease the maximum message size the broker will allow (using message.max.bytes).",
-                            recordTooLargePartitions);
                 }
             } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
                     || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
@@ -648,6 +635,11 @@ public class Fetcher<K, V> {
             completedFetch.metricAggregator.record(tp, bytes, recordsCount);
         }
 
+        // we move the partition to the end if we received some bytes or if there was an error. This way, it's more
+        // likely that partitions for the same topic can remain together (allowing for more efficient serialization).
+        if (bytes > 0 || partition.errorCode != Errors.NONE.code())
+            subscriptions.movePartitionToEnd(tp);
+
         return parsedRecords;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index dd0bce9..9029417 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -16,18 +16,21 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.PartitionStates;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
 /**
  * A class for tracking the topics, partitions, and offsets for the consumer. A partition
- * is "assigned" either directly with {@link #assignFromUser(Collection)} (manual assignment)
+ * is "assigned" either directly with {@link #assignFromUser(Set)} (manual assignment)
  * or with {@link #assignFromSubscribed(Collection)} (automatic assignment from subscription).
  *
  * Once assigned, the partition is not considered "fetchable" until its initial position has
@@ -68,8 +71,8 @@ public class SubscriptionState {
     /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
     private final Set<String> groupSubscription;
 
-    /* the list of partitions currently assigned */
-    private final Map<TopicPartition, TopicPartitionState> assignment;
+    /* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */
+    private final PartitionStates<TopicPartitionState> assignment;
 
     /* do we need to request the latest committed offsets from the coordinator? */
     private boolean needsFetchCommittedOffsets;
@@ -84,7 +87,7 @@ public class SubscriptionState {
         this.defaultResetStrategy = defaultResetStrategy;
         this.subscription = Collections.emptySet();
         this.userAssignment = Collections.emptySet();
-        this.assignment = new HashMap<>();
+        this.assignment = new PartitionStates<>();
         this.groupSubscription = new HashSet<>();
         this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
         this.subscribedPattern = null;
@@ -156,13 +159,17 @@ public class SubscriptionState {
     public void assignFromUser(Set<TopicPartition> partitions) {
         setSubscriptionType(SubscriptionType.USER_ASSIGNED);
 
-        if (!this.assignment.keySet().equals(partitions)) {
+        if (!this.assignment.partitionSet().equals(partitions)) {
             this.userAssignment = partitions;
 
-            for (TopicPartition partition : partitions)
-                if (!assignment.containsKey(partition))
-                    addAssignedPartition(partition);
-            this.assignment.keySet().retainAll(this.userAssignment);
+            Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
+            for (TopicPartition partition : partitions) {
+                TopicPartitionState state = assignment.stateValue(partition);
+                if (state == null)
+                    state = new TopicPartitionState();
+                partitionToState.put(partition, state);
+            }
+            this.assignment.set(partitionToState);
             this.needsFetchCommittedOffsets = true;
         }
     }
@@ -179,13 +186,18 @@ public class SubscriptionState {
             if (!this.subscription.contains(tp.topic()))
                 throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
 
-        // after rebalancing, we always reinitialize the assignment state
-        this.assignment.clear();
-        for (TopicPartition tp: assignments)
-            addAssignedPartition(tp);
+        // after rebalancing, we always reinitialize the assignment value
+        this.assignment.set(partitionToStateMap(assignments));
         this.needsFetchCommittedOffsets = true;
     }
 
+    private Map<TopicPartition, TopicPartitionState> partitionToStateMap(Collection<TopicPartition> assignments) {
+        Map<TopicPartition, TopicPartitionState> map = new HashMap<>(assignments.size());
+        for (TopicPartition tp : assignments)
+            map.put(tp, new TopicPartitionState());
+        return map;
+    }
+
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
         if (listener == null)
             throw new IllegalArgumentException("RebalanceListener cannot be null");
@@ -218,11 +230,9 @@ public class SubscriptionState {
 
     public Set<TopicPartition> pausedPartitions() {
         HashSet<TopicPartition> paused = new HashSet<>();
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
-            final TopicPartition tp = entry.getKey();
-            final TopicPartitionState state = entry.getValue();
-            if (state.paused) {
-                paused.add(tp);
+        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
+            if (state.value().paused) {
+                paused.add(state.topicPartition());
             }
         }
         return paused;
@@ -243,7 +253,7 @@ public class SubscriptionState {
     }
 
     private TopicPartitionState assignedState(TopicPartition tp) {
-        TopicPartitionState state = this.assignment.get(tp);
+        TopicPartitionState state = this.assignment.stateValue(tp);
         if (state == null)
             throw new IllegalStateException("No current assignment for partition " + tp);
         return state;
@@ -274,14 +284,14 @@ public class SubscriptionState {
     }
 
     public Set<TopicPartition> assignedPartitions() {
-        return this.assignment.keySet();
+        return this.assignment.partitionSet();
     }
 
-    public Set<TopicPartition> fetchablePartitions() {
-        Set<TopicPartition> fetchable = new HashSet<>();
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
-            if (entry.getValue().isFetchable())
-                fetchable.add(entry.getKey());
+    public List<TopicPartition> fetchablePartitions() {
+        List<TopicPartition> fetchable = new ArrayList<>();
+        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
+            if (state.value().isFetchable())
+                fetchable.add(state.topicPartition());
         }
         return fetchable;
     }
@@ -300,10 +310,9 @@ public class SubscriptionState {
 
     public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
         Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
-            TopicPartitionState state = entry.getValue();
-            if (state.hasValidPosition())
-                allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.position));
+        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
+            if (state.value().hasValidPosition())
+                allConsumed.put(state.topicPartition(), new OffsetAndMetadata(state.value().position));
         }
         return allConsumed;
     }
@@ -329,7 +338,7 @@ public class SubscriptionState {
     }
 
     public boolean hasAllFetchPositions() {
-        for (TopicPartitionState state : assignment.values())
+        for (TopicPartitionState state : assignment.partitionStateValues())
             if (!state.hasValidPosition())
                 return false;
         return true;
@@ -337,14 +346,15 @@ public class SubscriptionState {
 
     public Set<TopicPartition> missingFetchPositions() {
         Set<TopicPartition> missing = new HashSet<>();
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet())
-            if (!entry.getValue().hasValidPosition())
-                missing.add(entry.getKey());
+        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
+            if (!state.value().hasValidPosition())
+                missing.add(state.topicPartition());
+        }
         return missing;
     }
 
     public boolean isAssigned(TopicPartition tp) {
-        return assignment.containsKey(tp);
+        return assignment.contains(tp);
     }
 
     public boolean isPaused(TopicPartition tp) {
@@ -363,8 +373,8 @@ public class SubscriptionState {
         assignedState(tp).resume();
     }
 
-    private void addAssignedPartition(TopicPartition tp) {
-        this.assignment.put(tp, new TopicPartitionState());
+    public void movePartitionToEnd(TopicPartition tp) {
+        assignment.moveToEnd(tp);
     }
 
     public ConsumerRebalanceListener listener() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
new file mode 100644
index 0000000..49823c0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This class is a useful building block for doing fetch requests where topic partitions have to be rotated via
+ * round-robin to ensure fairness and some level of determinism given the existence of a limit on the fetch response
+ * size. Because the serialization of fetch requests is more efficient if all partitions for the same topic are grouped
+ * together, we do such grouping in the method `set`.
+ *
+ * As partitions are moved to the end, the same topic may be repeated more than once. In the optimal case, a single
+ * topic would "wrap around" and appear twice. However, as partitions are fetched in different orders and partition
+ * leadership changes, we will deviate from the optimal. If this turns out to be an issue in practice, we can improve
+ * it by tracking the partitions per node or calling `set` every so often.
+ */
+public class PartitionStates<S> {
+
+    private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
+
+    public PartitionStates() {}
+
+    public void moveToEnd(TopicPartition topicPartition) {
+        S state = map.remove(topicPartition);
+        if (state != null)
+            map.put(topicPartition, state);
+    }
+
+    public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {
+        map.remove(topicPartition);
+        map.put(topicPartition, state);
+    }
+
+    public void remove(TopicPartition topicPartition) {
+        map.remove(topicPartition);
+    }
+
+    /**
+     * Returns the partitions in random order.
+     */
+    public Set<TopicPartition> partitionSet() {
+        return new HashSet<>(map.keySet());
+    }
+
+    public void clear() {
+        map.clear();
+    }
+
+    public boolean contains(TopicPartition topicPartition) {
+        return map.containsKey(topicPartition);
+    }
+
+    /**
+     * Returns the partition states in order.
+     */
+    public List<PartitionState<S>> partitionStates() {
+        List<PartitionState<S>> result = new ArrayList<>();
+        for (Map.Entry<TopicPartition, S> entry : map.entrySet()) {
+            result.add(new PartitionState<>(entry.getKey(), entry.getValue()));
+        }
+        return result;
+    }
+
+    /**
+     * Returns the partition state values in order.
+     */
+    public List<S> partitionStateValues() {
+        return new ArrayList<>(map.values());
+    }
+
+    public S stateValue(TopicPartition topicPartition) {
+        return map.get(topicPartition);
+    }
+
+    public int size() {
+        return map.size();
+    }
+
+    /**
+     * Update the builder to have the received map as its state (i.e. the previous state is cleared). The builder will
+     * "batch by topic", so if we have a, b and c, each with two partitions, we may end up with something like the
+     * following (the order of topics and partitions within topics is dependent on the iteration order of the received
+     * map): a0, a1, b1, b0, c0, c1.
+     */
+    public void set(Map<TopicPartition, S> partitionToState) {
+        map.clear();
+        update(partitionToState);
+    }
+
+    private void update(Map<TopicPartition, S> partitionToState) {
+        LinkedHashMap<String, List<TopicPartition>> topicToPartitions = new LinkedHashMap<>();
+        for (TopicPartition tp : partitionToState.keySet()) {
+            List<TopicPartition> partitions = topicToPartitions.get(tp.topic());
+            if (partitions == null) {
+                partitions = new ArrayList<>();
+                topicToPartitions.put(tp.topic(), partitions);
+            }
+            partitions.add(tp);
+        }
+        for (Map.Entry<String, List<TopicPartition>> entry : topicToPartitions.entrySet()) {
+            for (TopicPartition tp : entry.getValue()) {
+                S state = partitionToState.get(tp);
+                map.put(tp, state);
+            }
+        }
+    }
+
+    public static class PartitionState<S> {
+        private final TopicPartition topicPartition;
+        private final S value;
+
+        public PartitionState(TopicPartition topicPartition, S state) {
+            this.topicPartition = Objects.requireNonNull(topicPartition);
+            this.value = Objects.requireNonNull(state);
+        }
+
+        public S value() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PartitionState<?> that = (PartitionState<?>) o;
+
+            return topicPartition.equals(that.topicPartition) && value.equals(that.value);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = topicPartition.hashCode();
+            result = 31 * result + value.hashCode();
+            return result;
+        }
+
+        public TopicPartition topicPartition() {
+            return topicPartition;
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionState(" + topicPartition + "=" + value + ')';
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index bda4757..d64cf6d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -444,6 +444,26 @@ public class Protocol {
     // Only the version number is incremented to indicate the client support message format V1 which uses
     // relative offset and has timestamp.
     public static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1;
+    // FETCH_REQUEST_V3 added top level max_bytes field - the total size of partition data to accumulate in response.
+    // The partition ordering is now relevant - partitions will be processed in order they appear in request.
+    public static final Schema FETCH_REQUEST_V3 = new Schema(new Field("replica_id",
+                                                                       INT32,
+                                                                       "Broker id of the follower. For normal consumers, use -1."),
+                                                             new Field("max_wait_time",
+                                                                       INT32,
+                                                                       "Maximum time in ms to wait for the response."),
+                                                             new Field("min_bytes",
+                                                                       INT32,
+                                                                       "Minimum bytes to accumulate in the response."),
+                                                             new Field("max_bytes",
+                                                                       INT32,
+                                                                       "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
+                                                                       "if the first message in the first non-empty partition of the fetch is larger than this " +
+                                                                       "value, the message will still be returned to ensure that progress can be made."),
+                                                             new Field("topics",
+                                                                       new ArrayOf(FETCH_REQUEST_TOPIC_V0),
+                                                                       "Topics to fetch in the order provided."));
+
     public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                   INT32,
                                                                                   "Topic partition id."),
@@ -470,9 +490,10 @@ public class Protocol {
     // record set only includes messages of v0 (magic byte 0). In v2, record set can include messages of v0 and v1
     // (magic byte 0 and 1). For details, see ByteBufferMessageSet.
     public static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
+    public static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
 
-    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2};
-    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2};
+    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3};
+    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3};
 
     /* List groups api */
     public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index f8b7fe3..dcdfd9c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -14,7 +14,7 @@ package org.apache.kafka.common.requests;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 public class FetchRequest extends AbstractRequest {
 
@@ -35,6 +34,9 @@ public class FetchRequest extends AbstractRequest {
     private static final String MIN_BYTES_KEY_NAME = "min_bytes";
     private static final String TOPICS_KEY_NAME = "topics";
 
+    // request and partition level name
+    private static final String MAX_BYTES_KEY_NAME = "max_bytes";
+
     // topic level field names
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
@@ -42,12 +44,15 @@ public class FetchRequest extends AbstractRequest {
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
-    private static final String MAX_BYTES_KEY_NAME = "max_bytes";
+
+    // default values for older versions where a request level limit did not exist
+    public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE;
 
     private final int replicaId;
     private final int maxWait;
     private final int minBytes;
-    private final Map<TopicPartition, PartitionData> fetchData;
+    private final int maxBytes;
+    private final LinkedHashMap<TopicPartition, PartitionData> fetchData;
 
     public static final class PartitionData {
         public final long offset;
@@ -59,29 +64,79 @@ public class FetchRequest extends AbstractRequest {
         }
     }
 
+    static final class TopicAndPartitionData<T> {
+        public final String topic;
+        public final LinkedHashMap<Integer, T> partitions;
+
+        public TopicAndPartitionData(String topic) {
+            this.topic = topic;
+            this.partitions = new LinkedHashMap<>();
+        }
+
+        public static <T> List<TopicAndPartitionData<T>> batchByTopic(LinkedHashMap<TopicPartition, T> data) {
+            List<TopicAndPartitionData<T>> topics = new ArrayList<>();
+            for (Map.Entry<TopicPartition, T> topicEntry : data.entrySet()) {
+                String topic = topicEntry.getKey().topic();
+                int partition = topicEntry.getKey().partition();
+                T partitionData = topicEntry.getValue();
+                if (topics.isEmpty() || !topics.get(topics.size() - 1).topic.equals(topic))
+                    topics.add(new TopicAndPartitionData(topic));
+                topics.get(topics.size() - 1).partitions.put(partition, partitionData);
+            }
+            return topics;
+        }
+    }
+
     /**
-     * Create a non-replica fetch request
+     * Create a non-replica fetch request for versions 0, 1 or 2 (the version is set via the RequestHeader).
      */
+    @Deprecated
     public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
-        this(CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
+        // Any of 0, 1 or 2 would do here since the schemas for these versions are identical
+        this(2, CONSUMER_REPLICA_ID, maxWait, minBytes, DEFAULT_RESPONSE_MAX_BYTES, new LinkedHashMap<>(fetchData));
     }
 
     /**
-     * Create a replica fetch request
+     * Create a non-replica fetch request for the current version.
      */
-    public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(fetchData);
+    public FetchRequest(int maxWait, int minBytes, int maxBytes, LinkedHashMap<TopicPartition, PartitionData> fetchData) {
+        this(ProtoUtils.latestVersion(ApiKeys.FETCH.id), CONSUMER_REPLICA_ID, maxWait, minBytes, maxBytes, fetchData);
+    }
+
+    /**
+     * Create a replica fetch request for versions 0, 1 or 2 (the actual version is determined by the RequestHeader).
+     */
+    @Deprecated
+    public static FetchRequest fromReplica(int replicaId, int maxWait, int minBytes,
+                                           Map<TopicPartition, PartitionData> fetchData) {
+        // Any of 0, 1 or 2 would do here since the schemas for these versions are identical
+        return new FetchRequest(2, replicaId, maxWait, minBytes, DEFAULT_RESPONSE_MAX_BYTES, new LinkedHashMap<>(fetchData));
+    }
+
+    /**
+     * Create a replica fetch request for the current version.
+     */
+    public static FetchRequest fromReplica(int replicaId, int maxWait, int minBytes, int maxBytes,
+                                           LinkedHashMap<TopicPartition, PartitionData> fetchData) {
+        return new FetchRequest(ProtoUtils.latestVersion(ApiKeys.FETCH.id), replicaId, maxWait, minBytes, maxBytes, fetchData);
+    }
+
+    private FetchRequest(int version, int replicaId, int maxWait, int minBytes, int maxBytes,
+                         LinkedHashMap<TopicPartition, PartitionData> fetchData) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.FETCH.id, version)));
+        List<TopicAndPartitionData<PartitionData>> topicsData = TopicAndPartitionData.batchByTopic(fetchData);
 
         struct.set(REPLICA_ID_KEY_NAME, replicaId);
         struct.set(MAX_WAIT_KEY_NAME, maxWait);
         struct.set(MIN_BYTES_KEY_NAME, minBytes);
+        if (version >= 3)
+            struct.set(MAX_BYTES_KEY_NAME, maxBytes);
         List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
+        for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
             List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
@@ -96,6 +151,7 @@ public class FetchRequest extends AbstractRequest {
         this.replicaId = replicaId;
         this.maxWait = maxWait;
         this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
         this.fetchData = fetchData;
     }
 
@@ -104,7 +160,11 @@ public class FetchRequest extends AbstractRequest {
         replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
         maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
         minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
-        fetchData = new HashMap<TopicPartition, PartitionData>();
+        if (struct.hasField(MAX_BYTES_KEY_NAME))
+            maxBytes = struct.getInt(MAX_BYTES_KEY_NAME);
+        else
+            maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
+        fetchData = new LinkedHashMap<>();
         for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.getString(TOPIC_KEY_NAME);
@@ -121,7 +181,7 @@ public class FetchRequest extends AbstractRequest {
 
     @Override
     public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+        Map<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
             FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
@@ -134,6 +194,8 @@ public class FetchRequest extends AbstractRequest {
             case 0:
                 return new FetchResponse(responseData);
             case 1:
+            case 2:
+            case 3:
                 return new FetchResponse(responseData, 0);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
@@ -153,6 +215,10 @@ public class FetchRequest extends AbstractRequest {
         return minBytes;
     }
 
+    public int maxBytes() {
+        return maxBytes;
+    }
+
     public Map<TopicPartition, PartitionData> fetchData() {
         return fetchData;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f28472f..111d197 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -21,11 +21,10 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -65,7 +64,7 @@ public class FetchResponse extends AbstractRequestResponse {
     public static final long INVALID_HIGHWATERMARK = -1L;
     public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
 
-    private final Map<TopicPartition, PartitionData> responseData;
+    private final LinkedHashMap<TopicPartition, PartitionData> responseData;
     private final int throttleTime;
 
     public static final class PartitionData {
@@ -85,28 +84,61 @@ public class FetchResponse extends AbstractRequestResponse {
      * @param responseData fetched data grouped by topic-partition
      */
     public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0)));
-        initCommonFields(responseData);
-        this.responseData = responseData;
-        this.throttleTime = DEFAULT_THROTTLE_TIME;
+        this(0, new LinkedHashMap<>(responseData), DEFAULT_THROTTLE_TIME);
     }
 
-  /**
-   * Constructor for Version 1
-   * @param responseData fetched data grouped by topic-partition
-   * @param throttleTime Time in milliseconds the response was throttled
-   */
+    /**
+     * Constructor for Version 1 and 2
+     * @param responseData fetched data grouped by topic-partition
+     * @param throttleTime Time in milliseconds the response was throttled
+     */
     public FetchResponse(Map<TopicPartition, PartitionData> responseData, int throttleTime) {
-        super(new Struct(CURRENT_SCHEMA));
-        initCommonFields(responseData);
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+        // the schema for versions 1 and 2 is the same, so we pick 2 here
+        this(2, new LinkedHashMap<>(responseData), throttleTime);
+    }
+
+    /**
+     * Constructor for Version 3
+     * @param responseData fetched data grouped by topic-partition
+     * @param throttleTime Time in milliseconds the response was throttled
+     */
+    public FetchResponse(LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
+        this(3, responseData, throttleTime);
+    }
+
+    private FetchResponse(int version, LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version)));
+
+        List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData = FetchRequest.TopicAndPartitionData.batchByTopic(responseData);
+        List<Struct> topicArray = new ArrayList<>();
+        for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: topicsData) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+                partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
+                partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+
+        if (version >= 1)
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+
         this.responseData = responseData;
         this.throttleTime = throttleTime;
     }
 
     public FetchResponse(Struct struct) {
         super(struct);
-        responseData = new HashMap<TopicPartition, PartitionData>();
+        LinkedHashMap<TopicPartition, PartitionData> responseData = new LinkedHashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.getString(TOPIC_KEY_NAME);
@@ -120,34 +152,11 @@ public class FetchResponse extends AbstractRequestResponse {
                 responseData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
+        this.responseData = responseData;
         this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
     }
 
-    private void initCommonFields(Map<TopicPartition, PartitionData> responseData) {
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
-                partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
-                partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-    }
-
-
-    public Map<TopicPartition, PartitionData> responseData() {
+    public LinkedHashMap<TopicPartition, PartitionData> responseData() {
         return responseData;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index a910f2f..fd0794c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1078,6 +1078,7 @@ public class KafkaConsumerTest {
         long requestTimeoutMs = 30000;
         boolean excludeInternalTopics = true;
         int minBytes = 1;
+        int maxBytes = Integer.MAX_VALUE;
         int maxWaitMs = 500;
         int fetchSize = 1024 * 1024;
         int maxPollRecords = Integer.MAX_VALUE;
@@ -1116,6 +1117,7 @@ public class KafkaConsumerTest {
         Fetcher<String, String> fetcher = new Fetcher<>(
                 consumerClient,
                 minBytes,
+                maxBytes,
                 maxWaitMs,
                 fetchSize,
                 maxPollRecords,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 15bd9a2..88a0526 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -64,7 +63,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
@@ -80,6 +78,7 @@ public class FetcherTest {
     private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
     private TopicPartition tp = new TopicPartition(topicName, 0);
     private int minBytes = 1;
+    private int maxBytes = Integer.MAX_VALUE;
     private int maxWaitMs = 0;
     private int fetchSize = 1000;
     private long retryBackoffMs = 100;
@@ -315,25 +314,6 @@ public class FetcherTest {
         assertEquals(30L, consumerRecords.get(2).offset());
     }
 
-    @Test(expected = RecordTooLargeException.class)
-    public void testFetchRecordTooLarge() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
-
-        // prepare large record
-        MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
-        byte[] bytes = new byte[this.fetchSize];
-        new Random().nextBytes(bytes);
-        records.append(1L, 0L, null, bytes);
-        records.close();
-
-        // resize the limit of the buffer to pretend it is only fetch-size large
-        fetcher.sendFetches();
-        client.prepareResponse(fetchResponse((ByteBuffer) records.buffer().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
-        consumerClient.poll(0);
-        fetcher.fetchedRecords();
-    }
-
     @Test
     public void testUnauthorizedTopic() {
         subscriptions.assignFromUser(singleton(tp));
@@ -721,6 +701,7 @@ public class FetcherTest {
                                                int maxPollRecords) {
         return new Fetcher<>(consumerClient,
                 minBytes,
+                maxBytes,
                 maxWaitMs,
                 fetchSize,
                 maxPollRecords,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java b/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
new file mode 100644
index 0000000..66c7abc
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PartitionStatesTest {
+
+    @Test
+    public void testSet() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+        LinkedHashMap<TopicPartition, String> expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        checkState(states, expected);
+
+        states.set(new LinkedHashMap<TopicPartition, String>());
+        checkState(states, new LinkedHashMap<TopicPartition, String>());
+    }
+
+    private LinkedHashMap<TopicPartition, String> createMap() {
+        LinkedHashMap<TopicPartition, String> map = new LinkedHashMap<>();
+        map.put(new TopicPartition("foo", 2), "foo 2");
+        map.put(new TopicPartition("blah", 2), "blah 2");
+        map.put(new TopicPartition("blah", 1), "blah 1");
+        map.put(new TopicPartition("baz", 2), "baz 2");
+        map.put(new TopicPartition("foo", 0), "foo 0");
+        map.put(new TopicPartition("baz", 3), "baz 3");
+        return map;
+    }
+
+    private void checkState(PartitionStates<String> states, LinkedHashMap<TopicPartition, String> expected) {
+        assertEquals(expected.keySet(), states.partitionSet());
+        assertEquals(expected.size(), states.size());
+        List<PartitionStates.PartitionState<String>> statesList = new ArrayList<>();
+        for (Map.Entry<TopicPartition, String> entry : expected.entrySet()) {
+            statesList.add(new PartitionStates.PartitionState<>(entry.getKey(), entry.getValue()));
+            assertTrue(states.contains(entry.getKey()));
+        }
+        assertEquals(statesList, states.partitionStates());
+    }
+
+    @Test
+    public void testMoveToEnd() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+
+        states.moveToEnd(new TopicPartition("baz", 2));
+        LinkedHashMap<TopicPartition, String> expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        checkState(states, expected);
+
+        states.moveToEnd(new TopicPartition("foo", 2));
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        checkState(states, expected);
+
+        // no-op
+        states.moveToEnd(new TopicPartition("foo", 2));
+        checkState(states, expected);
+
+        // partition doesn't exist
+        states.moveToEnd(new TopicPartition("baz", 5));
+        checkState(states, expected);
+
+        // topic doesn't exist
+        states.moveToEnd(new TopicPartition("aaa", 2));
+        checkState(states, expected);
+    }
+
+    @Test
+    public void testUpdateAndMoveToEnd() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+
+        states.updateAndMoveToEnd(new TopicPartition("foo", 0), "foo 0 updated");
+        LinkedHashMap<TopicPartition, String> expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("foo", 0), "foo 0 updated");
+        checkState(states, expected);
+
+        states.updateAndMoveToEnd(new TopicPartition("baz", 2), "baz 2 updated");
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("foo", 0), "foo 0 updated");
+        expected.put(new TopicPartition("baz", 2), "baz 2 updated");
+        checkState(states, expected);
+
+        // partition doesn't exist
+        states.updateAndMoveToEnd(new TopicPartition("baz", 5), "baz 5 new");
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("foo", 0), "foo 0 updated");
+        expected.put(new TopicPartition("baz", 2), "baz 2 updated");
+        expected.put(new TopicPartition("baz", 5), "baz 5 new");
+        checkState(states, expected);
+
+        // topic doesn't exist
+        states.updateAndMoveToEnd(new TopicPartition("aaa", 2), "aaa 2 new");
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("foo", 0), "foo 0 updated");
+        expected.put(new TopicPartition("baz", 2), "baz 2 updated");
+        expected.put(new TopicPartition("baz", 5), "baz 5 new");
+        expected.put(new TopicPartition("aaa", 2), "aaa 2 new");
+        checkState(states, expected);
+    }
+
+    @Test
+    public void testPartitionValues() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+        List<String> expected = new ArrayList<>();
+        expected.add("foo 2");
+        expected.add("foo 0");
+        expected.add("blah 2");
+        expected.add("blah 1");
+        expected.add("baz 2");
+        expected.add("baz 3");
+        assertEquals(expected, states.partitionStateValues());
+    }
+
+    @Test
+    public void testClear() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+        states.clear();
+        checkState(states, new LinkedHashMap<TopicPartition, String>());
+    }
+
+    @Test
+    public void testRemove() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+
+        states.remove(new TopicPartition("foo", 2));
+        LinkedHashMap<TopicPartition, String> expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        checkState(states, expected);
+
+        states.remove(new TopicPartition("blah", 1));
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        checkState(states, expected);
+
+        states.remove(new TopicPartition("baz", 3));
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        checkState(states, expected);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index dd77e03..b3baa63 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -49,8 +50,8 @@ public class RequestResponseTest {
                 createControlledShutdownRequest(),
                 createControlledShutdownResponse(),
                 createControlledShutdownRequest().getErrorResponse(1, new UnknownServerException()),
-                createFetchRequest(),
-                createFetchRequest().getErrorResponse(1, new UnknownServerException()),
+                createFetchRequest(3),
+                createFetchRequest(3).getErrorResponse(3, new UnknownServerException()),
                 createFetchResponse(),
                 createHeartBeatRequest(),
                 createHeartBeatRequest().getErrorResponse(0, new UnknownServerException()),
@@ -112,11 +113,12 @@ public class RequestResponseTest {
         for (AbstractRequestResponse req : requestResponseList)
             checkSerialization(req, null);
 
+
+        checkOlderFetchVersions();
         checkSerialization(createMetadataResponse(0), 0);
         checkSerialization(createMetadataResponse(1), 1);
         checkSerialization(createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(0, new UnknownServerException()), 0);
         checkSerialization(createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(1, new UnknownServerException()), 1);
-        checkSerialization(createFetchRequest().getErrorResponse(0, new UnknownServerException()), 0);
         checkSerialization(createOffsetCommitRequest(0), 0);
         checkSerialization(createOffsetCommitRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
         checkSerialization(createOffsetCommitRequest(1), 1);
@@ -129,6 +131,14 @@ public class RequestResponseTest {
         checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(1, new UnknownServerException()), 1);
     }
 
+    private void checkOlderFetchVersions() throws Exception {
+        int latestVersion = ProtoUtils.latestVersion(ApiKeys.FETCH.id);
+        for (int i = 0; i < latestVersion; ++i) {
+            checkSerialization(createFetchRequest(i).getErrorResponse(i, new UnknownServerException()), i);
+            checkSerialization(createFetchRequest(i), i);
+        }
+    }
+
     private void checkSerialization(AbstractRequestResponse req, Integer version) throws Exception {
         ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
         req.writeTo(buffer);
@@ -219,11 +229,15 @@ public class RequestResponseTest {
         return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
     }
 
-    private AbstractRequest createFetchRequest() {
-        Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<>();
+    @SuppressWarnings("deprecation")
+    private AbstractRequest createFetchRequest(int version) {
+        LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
         fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
         fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
-        return new FetchRequest(-1, 100, 100000, fetchData);
+        if (version < 3)
+            return new FetchRequest(100, 100000, fetchData);
+        else
+            return new FetchRequest(100, 1000, 1000000, fetchData);
     }
 
     private AbstractRequestResponse createFetchResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 58bdb7a..700c54d 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -82,7 +82,6 @@ object ConfigCommand extends Config {
     val entity = parseEntity(opts)
     val entityType = entity.root.entityType
     val entityName = entity.fullSanitizedName
-    warnOnMaxMessagesChange(configsToBeAdded, opts.options.has(opts.forceOpt))
 
     // compile the final set of configs
     val configs = utils.fetchEntityConfig(zkUtils, entityType, entityName)
@@ -99,22 +98,9 @@ object ConfigCommand extends Config {
     println(s"Updated config for entity: $entity.")
   }
 
-  def warnOnMaxMessagesChange(configs: Properties, force: Boolean): Unit = {
-    val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match {
-      case n: String => n.toInt
-      case _ => -1
-    }
-    if (maxMessageBytes > Defaults.MaxMessageSize){
-      error(TopicCommand.longMessageSizeWarning(maxMessageBytes))
-      if (!force)
-        TopicCommand.askToProceed
-    }
-  }
-
   private def parseBroker(broker: String): Int = {
-    try {
-      broker.toInt
-    }catch {
+    try broker.toInt
+    catch {
       case e: NumberFormatException =>
         throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 421486c..a35a989 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -97,13 +97,11 @@ object TopicCommand extends Logging {
     try {
       if (opts.options.has(opts.replicaAssignmentOpt)) {
         val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-        warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length, opts.options.has(opts.forceOpt))
         AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
       } else {
         CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
         val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
         val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
-        warnOnMaxMessagesChange(configs, replicas, opts.options.has(opts.forceOpt))
         val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
                             else RackAwareMode.Enforced
         AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
@@ -357,20 +355,6 @@ object TopicCommand extends Logging {
       CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt))
     }
   }
-  def warnOnMaxMessagesChange(configs: Properties, replicas: Integer, force: Boolean): Unit = {
-    val maxMessageBytes =  configs.get(LogConfig.MaxMessageBytesProp) match {
-      case n: String => n.toInt
-      case _ => -1
-    }
-    if (maxMessageBytes > Defaults.MaxMessageSize)
-      if (replicas > 1) {
-        error(longMessageSizeWarning(maxMessageBytes))
-        if (!force)
-          askToProceed
-      }
-      else
-        warn(shortMessageSizeWarning(maxMessageBytes))
-  }
 
   def askToProceed: Unit = {
     println("Are you sure you want to continue? [y/n]")
@@ -380,37 +364,5 @@ object TopicCommand extends Logging {
     }
   }
 
-  def shortMessageSizeWarning(maxMessageBytes: Int): String = {
-    "\n\n" +
-      "*****************************************************************************************************\n" +
-      "*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's    ***\n" +
-      "*** default max.message.bytes. This operation is potentially dangerous. Consumers will get        ***\n" +
-      s"*** failures if their fetch.message.max.bytes (old consumer) or ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}         ***\n"+
-      "*** (new consumer) < the value you are using.                                                     ***\n" +
-      "*****************************************************************************************************\n" +
-      s"- value set here: $maxMessageBytes\n" +
-      s"- Default Old Consumer fetch.message.max.bytes: ${OldConsumerConfig.FetchSize}\n" +
-      s"- Default New Consumer ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: ${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n" +
-      s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n\n"
-  }
-
-  def longMessageSizeWarning(maxMessageBytes: Int): String = {
-    "\n\n" +
-      "*****************************************************************************************************\n" +
-      "*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's    ***\n" +
-      "*** default max.message.bytes. This operation is dangerous. There are two potential side effects: ***\n" +
-      "*** - Consumers will get failures if their fetch.message.max.bytes (old consumer) or              ***\n" +
-      s"***   ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG} (new consumer) < the value you are using                          ***\n" +
-      "*** - Producer requests larger than replica.fetch.max.bytes will not replicate and hence have     ***\n" +
-      "***   a higher risk of data loss                                                                  ***\n" +
-      "*** You should ensure both of these settings are greater than the value set here before using     ***\n" +
-      "*** this topic.                                                                                   ***\n" +
-      "*****************************************************************************************************\n" +
-      s"- value set here: $maxMessageBytes\n" +
-      s"- Default Broker replica.fetch.max.bytes: ${kafka.server.Defaults.ReplicaFetchMaxBytes}\n" +
-      s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n" +
-      s"- Default Old Consumer fetch.message.max.bytes: ${OldConsumerConfig.FetchSize}\n" +
-      s"- Default New Consumer ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: ${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n\n"
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index d955225..cdd418d 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -54,7 +54,10 @@ object ApiVersion {
     "0.10.0" -> KAFKA_0_10_0_IV1,
 
     // introduced for JoinGroup protocol change in KIP-62
-    "0.10.1-IV0" -> KAFKA_0_10_1_IV0
+    "0.10.1-IV0" -> KAFKA_0_10_1_IV0,
+    // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+    "0.10.1-IV1" -> KAFKA_0_10_1_IV1,
+    "0.10.1" -> KAFKA_0_10_1_IV1
   )
 
   private val versionPattern = "\\.".r
@@ -120,3 +123,9 @@ case object KAFKA_0_10_1_IV0 extends ApiVersion {
   val messageFormatVersion: Byte = Message.MagicValue_V1
   val id: Int = 6
 }
+
+case object KAFKA_0_10_1_IV1 extends ApiVersion {
+  val version: String = "0.10.1-IV1"
+  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val id: Int = 7
+}


Mime
View raw message