kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [07/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)
Date Fri, 24 Mar 2017 19:44:00 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 9c33f71..7c029ca 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -16,12 +16,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -29,11 +23,18 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.utils.Utils;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
 public class FetchRequest extends AbstractRequest {
     public static final int CONSUMER_REPLICA_ID = -1;
     private static final String REPLICA_ID_KEY_NAME = "replica_id";
     private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
     private static final String MIN_BYTES_KEY_NAME = "min_bytes";
+    private static final String ISOLATION_LEVEL_KEY_NAME = "isolation_level";
     private static final String TOPICS_KEY_NAME = "topics";
 
     // request and partition level name
@@ -54,6 +55,7 @@ public class FetchRequest extends AbstractRequest {
     private final int maxWait;
     private final int minBytes;
     private final int maxBytes;
+    private final IsolationLevel isolationLevel;
     private final LinkedHashMap<TopicPartition, PartitionData> fetchData;
 
     public static final class PartitionData {
@@ -159,6 +161,7 @@ public class FetchRequest extends AbstractRequest {
         this.minBytes = minBytes;
         this.maxBytes = maxBytes;
         this.fetchData = fetchData;
+        this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
     }
 
     public FetchRequest(Struct struct, short version) {
@@ -170,6 +173,12 @@ public class FetchRequest extends AbstractRequest {
             maxBytes = struct.getInt(MAX_BYTES_KEY_NAME);
         else
             maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
+
+        if (struct.hasField(ISOLATION_LEVEL_KEY_NAME))
+            isolationLevel = IsolationLevel.forId(struct.getByte(ISOLATION_LEVEL_KEY_NAME));
+        else
+            isolationLevel = IsolationLevel.READ_UNCOMMITTED;
+
         fetchData = new LinkedHashMap<>();
         for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
@@ -191,8 +200,7 @@ public class FetchRequest extends AbstractRequest {
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
             FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e),
-                    FetchResponse.INVALID_HIGHWATERMARK, MemoryRecords.EMPTY);
-
+                    FetchResponse.INVALID_LSO, FetchResponse.INVALID_HIGHWATERMARK, null, MemoryRecords.EMPTY);
             responseData.put(entry.getKey(), partitionResponse);
         }
         return new FetchResponse(responseData, 0);
@@ -222,6 +230,10 @@ public class FetchRequest extends AbstractRequest {
         return replicaId >= 0;
     }
 
+    public IsolationLevel isolationLevel() {
+        return isolationLevel;
+    }
+
     public static FetchRequest parse(ByteBuffer buffer, short version) {
         return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
     }
@@ -237,6 +249,9 @@ public class FetchRequest extends AbstractRequest {
         struct.set(MIN_BYTES_KEY_NAME, minBytes);
         if (version >= 3)
             struct.set(MAX_BYTES_KEY_NAME, maxBytes);
+        if (version >= 4)
+            struct.set(ISOLATION_LEVEL_KEY_NAME, IsolationLevel.READ_UNCOMMITTED.id());
+
         List<Struct> topicArray = new ArrayList<>();
         for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 79b1a5c..617ebe0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.record.Records;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -49,9 +50,18 @@ public class FetchResponse extends AbstractResponse {
     private static final String PARTITION_HEADER_KEY_NAME = "partition_header";
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
+    private static final String LAST_STABLE_OFFSET_KEY_NAME = "last_stable_offset";
+    private static final String ABORTED_TRANSACTIONS_KEY_NAME = "aborted_transactions";
+    private static final String RECORD_SET_KEY_NAME = "record_set";
+
+    // aborted transaction field names
+    private static final String PID_KEY_NAME = "pid";
+    private static final String FIRST_OFFSET_KEY_NAME = "first_offset";
 
-    // Default throttle time
     private static final int DEFAULT_THROTTLE_TIME = 0;
+    public static final long INVALID_HIGHWATERMARK = -1L;
+    public static final long INVALID_LSO = -1L;
 
     /**
      * Possible error codes:
@@ -63,29 +73,48 @@ public class FetchResponse extends AbstractResponse {
      *  UNKNOWN (-1)
      */
 
-    private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
-    private static final String RECORD_SET_KEY_NAME = "record_set";
-
-    public static final long INVALID_HIGHWATERMARK = -1L;
-
     private final LinkedHashMap<TopicPartition, PartitionData> responseData;
     private final int throttleTimeMs;
 
+    public static final class AbortedTransaction {
+        public final long pid;
+        public final long firstOffset;
+
+        public AbortedTransaction(long pid, long firstOffset) {
+            this.pid = pid;
+            this.firstOffset = firstOffset;
+        }
+
+        @Override
+        public String toString() {
+            return "(pid=" + pid + ", firstOffset=" + firstOffset + ")";
+        }
+    }
+
     public static final class PartitionData {
         public final Errors error;
+        public final long lastStableOffset;
         public final long highWatermark;
+        public final List<AbortedTransaction> abortedTransactions;
         public final Records records;
 
-        public PartitionData(Errors error, long highWatermark, Records records) {
+        public PartitionData(Errors error,
+                             long highWatermark,
+                             long lastStableOffset,
+                             List<AbortedTransaction> abortedTransactions,
+                             Records records) {
             this.error = error;
             this.highWatermark = highWatermark;
+            this.lastStableOffset = lastStableOffset;
+            this.abortedTransactions = abortedTransactions;
             this.records = records;
         }
 
         @Override
         public String toString() {
-            return "(error=" + error.toString() + ", highWaterMark=" + highWatermark +
-                    ", records=" + records + ")";
+            return "(error=" + error + ", highWaterMark=" + highWatermark +
+                    ", lastStableOffset = " + lastStableOffset + ", " +
+                    "abortedTransactions = " + abortedTransactions + ", records=" + records + ")";
         }
     }
 
@@ -114,8 +143,28 @@ public class FetchResponse extends AbstractResponse {
                 int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME);
                 Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME));
                 long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
+                long lastStableOffset = INVALID_LSO;
+                if (partitionResponse.hasField(LAST_STABLE_OFFSET_KEY_NAME))
+                    lastStableOffset = partitionResponse.getLong(LAST_STABLE_OFFSET_KEY_NAME);
+
                 Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                PartitionData partitionData = new PartitionData(error, highWatermark, records);
+
+                List<AbortedTransaction> abortedTransactions = Collections.emptyList();
+                if (partitionResponse.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
+                    Object[] abortedTransactionsArray = partitionResponse.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
+                    if (abortedTransactionsArray != null) {
+                        abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
+                        for (Object abortedTransactionObj : abortedTransactionsArray) {
+                            Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
+                            long pid = abortedTransactionStruct.getLong(PID_KEY_NAME);
+                            long firstOffset = abortedTransactionStruct.getLong(FIRST_OFFSET_KEY_NAME);
+                            abortedTransactions.add(new AbortedTransaction(pid, firstOffset));
+                        }
+                    }
+                }
+
+                PartitionData partitionData = new PartitionData(error, highWatermark, lastStableOffset,
+                        abortedTransactions, records);
                 responseData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
@@ -225,6 +274,24 @@ public class FetchResponse extends AbstractResponse {
                 partitionDataHeader.set(PARTITION_KEY_NAME, partitionEntry.getKey());
                 partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code());
                 partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
+
+                if (version >= 4) {
+                    partitionDataHeader.set(LAST_STABLE_OFFSET_KEY_NAME, fetchPartitionData.lastStableOffset);
+
+                    if (fetchPartitionData.abortedTransactions == null) {
+                        partitionDataHeader.set(ABORTED_TRANSACTIONS_KEY_NAME, null);
+                    } else {
+                        List<Struct> abortedTransactionStructs = new ArrayList<>(fetchPartitionData.abortedTransactions.size());
+                        for (AbortedTransaction abortedTransaction : fetchPartitionData.abortedTransactions) {
+                            Struct abortedTransactionStruct = partitionDataHeader.instance(ABORTED_TRANSACTIONS_KEY_NAME);
+                            abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.pid);
+                            abortedTransactionStruct.set(FIRST_OFFSET_KEY_NAME, abortedTransaction.firstOffset);
+                            abortedTransactionStructs.add(abortedTransactionStruct);
+                        }
+                        partitionDataHeader.set(ABORTED_TRANSACTIONS_KEY_NAME, abortedTransactionStructs.toArray());
+                    }
+                }
+
                 partitionData.set(PARTITION_HEADER_KEY_NAME, partitionDataHeader);
                 partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.records);
                 partitionArray.add(partitionData);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java b/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
new file mode 100644
index 0000000..a09b625
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+public enum IsolationLevel {
+    READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1);
+
+    private final byte id;
+
+    IsolationLevel(byte id) {
+        this.id = id;
+    }
+
+    public byte id() {
+        return id;
+    }
+
+    public static IsolationLevel forId(byte id) {
+        switch (id) {
+            case 0:
+                return READ_UNCOMMITTED;
+            case 1:
+                return READ_COMMITTED;
+            default:
+                throw new IllegalArgumentException("Unknown isolation level " + id);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index df3b4fc..0010ad6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -21,6 +21,9 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.InvalidRecordException;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.Utils;
@@ -29,10 +32,12 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 public class ProduceRequest extends AbstractRequest {
+    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String ACKS_KEY_NAME = "acks";
     private static final String TIMEOUT_KEY_NAME = "timeout";
     private static final String TOPIC_DATA_KEY_NAME = "topic_data";
@@ -46,12 +51,17 @@ public class ProduceRequest extends AbstractRequest {
     private static final String RECORD_SET_KEY_NAME = "record_set";
 
     public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
+        private final byte magic;
         private final short acks;
         private final int timeout;
         private final Map<TopicPartition, MemoryRecords> partitionRecords;
 
-        public Builder(short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) {
-            super(ApiKeys.PRODUCE);
+        public Builder(byte magic,
+                       short acks,
+                       int timeout,
+                       Map<TopicPartition, MemoryRecords> partitionRecords) {
+            super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? 3 : 2));
+            this.magic = magic;
             this.acks = acks;
             this.timeout = timeout;
             this.partitionRecords = partitionRecords;
@@ -69,6 +79,7 @@ public class ProduceRequest extends AbstractRequest {
         public String toString() {
             StringBuilder bld = new StringBuilder();
             bld.append("(type=ProduceRequest")
+                    .append(", magic=").append(magic)
                     .append(", acks=").append(acks)
                     .append(", timeout=").append(timeout)
                     .append(", partitionRecords=(").append(Utils.mkString(partitionRecords))
@@ -79,6 +90,7 @@ public class ProduceRequest extends AbstractRequest {
 
     private final short acks;
     private final int timeout;
+    private final String transactionalId;
 
     private final Map<TopicPartition, Integer> partitionSizes;
 
@@ -91,8 +103,14 @@ public class ProduceRequest extends AbstractRequest {
         super(version);
         this.acks = acks;
         this.timeout = timeout;
+
+        // TODO: Include transactional id in constructor once transactions are supported
+        this.transactionalId = null;
         this.partitionRecords = partitionRecords;
         this.partitionSizes = createPartitionSizes(partitionRecords);
+
+        for (MemoryRecords records : partitionRecords.values())
+            validateRecords(version, records);
     }
 
     private static Map<TopicPartition, Integer> createPartitionSizes(Map<TopicPartition, MemoryRecords> partitionRecords) {
@@ -112,12 +130,36 @@ public class ProduceRequest extends AbstractRequest {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
                 MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME);
+                validateRecords(version, records);
                 partitionRecords.put(new TopicPartition(topic, partition), records);
             }
         }
         partitionSizes = createPartitionSizes(partitionRecords);
         acks = struct.getShort(ACKS_KEY_NAME);
         timeout = struct.getInt(TIMEOUT_KEY_NAME);
+        transactionalId = struct.hasField(TRANSACTIONAL_ID_KEY_NAME) ? struct.getString(TRANSACTIONAL_ID_KEY_NAME) : null;
+    }
+
+    private void validateRecords(short version, MemoryRecords records) {
+        if (version >= 3) {
+            Iterator<MutableRecordBatch> iterator = records.batches().iterator();
+            if (!iterator.hasNext())
+                throw new InvalidRecordException("Produce requests with version " + version + " must have at least " +
+                        "one record batch");
+
+            MutableRecordBatch entry = iterator.next();
+            if (entry.magic() != RecordBatch.MAGIC_VALUE_V2)
+                throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
+                        "contain record batches with magic version 2");
+
+            if (iterator.hasNext())
+                throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
+                        "contain exactly one record batch");
+        }
+
+        // Note that we do not do similar validation for older versions to ensure compatibility with
+        // clients which send the wrong magic version in the wrong version of the produce request. The broker
+        // did not do this validation before, so we maintain that behavior here.
     }
 
     /**
@@ -127,16 +169,21 @@ public class ProduceRequest extends AbstractRequest {
     public Struct toStruct() {
         // Store it in a local variable to protect against concurrent updates
         Map<TopicPartition, MemoryRecords> partitionRecords = partitionRecordsOrFail();
-        Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version()));
+        short version = version();
+        Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version));
         Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
         struct.set(ACKS_KEY_NAME, acks);
         struct.set(TIMEOUT_KEY_NAME, timeout);
+
+        if (struct.hasField(TRANSACTIONAL_ID_KEY_NAME))
+            struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+
         List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
-        for (Map.Entry<String, Map<Integer, MemoryRecords>> entry : recordsByTopic.entrySet()) {
+        for (Map.Entry<String, Map<Integer, MemoryRecords>> topicEntry : recordsByTopic.entrySet()) {
             Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entry.getKey());
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
             List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, MemoryRecords> partitionEntry : entry.getValue().entrySet()) {
+            for (Map.Entry<Integer, MemoryRecords> partitionEntry : topicEntry.getValue().entrySet()) {
                 MemoryRecords records = partitionEntry.getValue();
                 Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
                         .set(PARTITION_KEY_NAME, partitionEntry.getKey())
@@ -183,6 +230,7 @@ public class ProduceRequest extends AbstractRequest {
             case 0:
             case 1:
             case 2:
+            case 3:
                 return new ProduceResponse(responseMap);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
@@ -202,6 +250,10 @@ public class ProduceRequest extends AbstractRequest {
         return timeout;
     }
 
+    public String transactionalId() {
+        return transactionalId;
+    }
+
     /**
      * Returns the partition records or throws IllegalStateException if clearPartitionRecords() has been invoked.
      */
@@ -221,4 +273,26 @@ public class ProduceRequest extends AbstractRequest {
     public static ProduceRequest parse(ByteBuffer buffer, short version) {
         return new ProduceRequest(ApiKeys.PRODUCE.parseRequest(version, buffer), version);
     }
+
+    public static byte requiredMagicForVersion(short produceRequestVersion) {
+        switch (produceRequestVersion) {
+            case 0:
+            case 1:
+                return RecordBatch.MAGIC_VALUE_V0;
+
+            case 2:
+                return RecordBatch.MAGIC_VALUE_V1;
+
+            case 3:
+                return RecordBatch.MAGIC_VALUE_V2;
+
+            default:
+                // raise an exception if the version has not been explicitly added to this method.
+                // this ensures that we cannot accidentally use the wrong magic value if we forget
+                // to update this method on a bump to the produce request version.
+                throw new IllegalArgumentException("Magic value to use for produce request version " +
+                        produceRequestVersion + " is not known");
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index cae6e94..152391e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
@@ -152,7 +152,7 @@ public class ProduceResponse extends AbstractResponse {
         public long logAppendTime;
 
         public PartitionResponse(Errors error) {
-            this(error, INVALID_OFFSET, Record.NO_TIMESTAMP);
+            this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP);
         }
 
         public PartitionResponse(Errors error, long baseOffset, long logAppendTime) {
@@ -179,4 +179,5 @@ public class ProduceResponse extends AbstractResponse {
     public static ProduceResponse parse(ByteBuffer buffer, short version) {
         return new ProduceResponse(ApiKeys.PRODUCE.responseSchema(version).read(buffer));
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
index f779371..687d246 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.nio.ByteBuffer;
 import java.util.zip.Checksum;
 
 /**
@@ -56,6 +57,20 @@ public class Crc32 implements Checksum {
         return crc.getValue();
     }
 
+    /**
+     * Compute the CRC32 of a byte buffer from a given offset (relative to the buffer's current position)
+     *
+     * @param buffer The buffer with the underlying data
+     * @param offset The offset relative to the current position
+     * @param size The number of bytes beginning from the offset to include
+     * @return The CRC32
+     */
+    public static long crc32(ByteBuffer buffer, int offset, int size) {
+        Crc32 crc = new Crc32();
+        crc.update(buffer, offset, size);
+        return crc.getValue();
+    }
+
     /** the current CRC value, bit-flipped */
     private int crc;
 
@@ -74,6 +89,20 @@ public class Crc32 implements Checksum {
         crc = 0xffffffff;
     }
 
+    public void update(ByteBuffer buffer, int length) {
+        update(buffer, 0, length);
+    }
+
+    public void update(ByteBuffer buffer, int offset, int length) {
+        if (buffer.hasArray()) {
+            update(buffer.array(), buffer.position() + buffer.arrayOffset() + offset, length);
+        } else {
+            int start = buffer.position() + offset;
+            for (int i = start; i < start + length; i++)
+                update(buffer.get(i));
+        }
+    }
+
     @Override
     public void update(byte[] b, int off, int len) {
         if (off < 0 || len < 0 || off > b.length - len)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 6dce978..696d145 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
@@ -29,11 +30,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -80,11 +81,35 @@ public class Utils {
      * @return The string
      */
     public static String utf8(byte[] bytes) {
-        try {
-            return new String(bytes, "UTF8");
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException("This shouldn't happen.", e);
-        }
+        return new String(bytes, StandardCharsets.UTF_8);
+    }
+
+    /**
+     * Read a UTF8 string from a byte buffer. Note that the position of the byte buffer is not affected
+     * by this method.
+     *
+     * @param buffer The buffer to read from
+     * @param length The length of the string in bytes
+     * @return The UTF8 string
+     */
+    public static String utf8(ByteBuffer buffer, int length) {
+        return utf8(buffer, 0, length);
+    }
+
+    /**
+     * Read a UTF8 string from a byte buffer at a given offset. Note that the position of the byte buffer
+     * is not affected by this method.
+     *
+     * @param buffer The buffer to read from
+     * @param offset The offset relative to the current position in the buffer
+     * @param length The length of the string in bytes
+     * @return The UTF8 string
+     */
+    public static String utf8(ByteBuffer buffer, int offset, int length) {
+        if (buffer.hasArray())
+            return new String(buffer.array(), buffer.arrayOffset() + buffer.position() + offset, length, StandardCharsets.UTF_8);
+        else
+            return utf8(toArray(buffer, offset, length));
     }
 
     /**
@@ -94,11 +119,7 @@ public class Utils {
      * @return The byte[]
      */
     public static byte[] utf8(String string) {
-        try {
-            return string.getBytes("UTF8");
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException("This shouldn't happen.", e);
-        }
+        return string.getBytes(StandardCharsets.UTF_8);
     }
 
     /**
@@ -153,10 +174,11 @@ public class Utils {
     }
 
     /**
-     * Read the given byte buffer into a byte array
+     * Read the given byte buffer from its current position to its limit into a byte array.
+     * @param buffer The buffer to read from
      */
     public static byte[] toArray(ByteBuffer buffer) {
-        return toArray(buffer, 0, buffer.limit());
+        return toArray(buffer, 0, buffer.remaining());
     }
 
     /**
@@ -179,13 +201,17 @@ public class Utils {
 
     /**
      * Read a byte array from the given offset and size in the buffer
+     * @param buffer The buffer to read from
+     * @param offset The offset relative to the current position of the buffer
+     * @param size The number of bytes to read into the array
      */
     public static byte[] toArray(ByteBuffer buffer, int offset, int size) {
         byte[] dest = new byte[size];
         if (buffer.hasArray()) {
-            System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size);
+            System.arraycopy(buffer.array(), buffer.position() + buffer.arrayOffset() + offset, dest, 0, size);
         } else {
             int pos = buffer.position();
+            buffer.position(pos + offset);
             buffer.get(dest);
             buffer.position(pos);
         }
@@ -684,16 +710,6 @@ public class Utils {
     }
 
     /**
-     * Compute the checksum of a range of data
-     * @param buffer Buffer containing the data to checksum
-     * @param start Offset in the buffer to read from
-     * @param size The number of bytes to include
-     */
-    public static long computeChecksum(ByteBuffer buffer, int start, int size) {
-        return Crc32.crc32(buffer.array(), buffer.arrayOffset() + start, size);
-    }
-
-    /**
      * Read data from the channel to the given byte buffer until there are no bytes remaining in the buffer. If the end
      * of the file is reached while there are bytes remaining in the buffer, an EOFException is thrown.
      *
@@ -745,4 +761,29 @@ public class Utils {
         } while (bytesRead != -1 && destinationBuffer.hasRemaining());
     }
 
+    /**
+     * Write the contents of a buffer to an output stream. The bytes are copied from the current position
+     * in the buffer.
+     * @param out The output to write to
+     * @param buffer The buffer to write from
+     * @param length The number of bytes to write
+     * @throws IOException For any errors writing to the output
+     */
+    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+        if (buffer.hasArray()) {
+            out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
+        } else {
+            int pos = buffer.position();
+            for (int i = pos; i < length + pos; i++)
+                out.writeByte(buffer.get(i));
+        }
+    }
+
+    public static <T> List<T> toList(Iterator<T> iterator) {
+        List<T> res = new ArrayList<>();
+        while (iterator.hasNext())
+            res.add(iterator.next());
+        return res;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
new file mode 100644
index 0000000..654a606
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class ApiVersionsTest {
+
+    @Test
+    public void testMaxUsableProduceMagic() {
+        ApiVersions apiVersions = new ApiVersions();
+        assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
+
+        apiVersions.update("0", NodeApiVersions.create());
+        assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
+
+        apiVersions.update("1", NodeApiVersions.create(Collections.singleton(
+                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
+        assertEquals(RecordBatch.MAGIC_VALUE_V1, apiVersions.maxUsableProduceMagic());
+
+        apiVersions.remove("1");
+        assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 3726f1a..b871a49 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -65,7 +65,6 @@ public class MockClient implements KafkaClient {
     private final Time time;
     private final Metadata metadata;
     private Set<String> unavailableTopics;
-    private int correlation = 0;
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
     private final Map<Node, Long> blackedOut = new HashMap<>();
@@ -149,11 +148,13 @@ public class MockClient implements KafkaClient {
             FutureResponse futureResp = iterator.next();
             if (futureResp.node != null && !request.destination().equals(futureResp.node.idString()))
                 continue;
-            short usableVersion = nodeApiVersions.usableVersion(request.requestBuilder().apiKey());
-            AbstractRequest abstractRequest = request.requestBuilder().build(usableVersion);
+
+            AbstractRequest.Builder<?> builder = request.requestBuilder();
+            short version = nodeApiVersions.usableVersion(request.apiKey(), builder.desiredVersion());
+            AbstractRequest abstractRequest = request.requestBuilder().build(version);
             if (!futureResp.requestMatcher.matches(abstractRequest))
                 throw new IllegalStateException("Next in line response did not match expected request");
-            ClientResponse resp = new ClientResponse(request.makeHeader(usableVersion), request.callback(), request.destination(),
+            ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                     request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, null, futureResp.responseBody);
             responses.add(resp);
             iterator.remove();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index de60f44..5af2a26 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
@@ -61,17 +62,18 @@ public class NetworkClientTest {
 
     private NetworkClient createNetworkClient() {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest,
-                64 * 1024, 64 * 1024, requestTimeoutMs, time, true);
+                64 * 1024, 64 * 1024, requestTimeoutMs, time, true, new ApiVersions());
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
         return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
-                "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, time, true);
+                "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs,
+                time, true, new ApiVersions());
     }
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest,
-                64 * 1024, 64 * 1024, requestTimeoutMs, time, false);
+                64 * 1024, 64 * 1024, requestTimeoutMs, time, false, new ApiVersions());
     }
 
     @Before
@@ -111,7 +113,8 @@ public class NetworkClientTest {
         client.poll(1, time.milliseconds());
         assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
 
-        ProduceRequest.Builder builder = new ProduceRequest.Builder((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
+        ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000,
+                Collections.<TopicPartition, MemoryRecords>emptyMap());
         ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
         client.send(request, time.milliseconds());
         assertEquals("There should be 1 in-flight request after send", 1,
@@ -126,8 +129,8 @@ public class NetworkClientTest {
 
     private void checkSimpleRequestResponse(NetworkClient networkClient) {
         awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
-        ProduceRequest.Builder builder =
-                new ProduceRequest.Builder((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
+        ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000,
+                        Collections.<TopicPartition, MemoryRecords>emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = networkClient.newClientRequest(
                 node.idString(), builder, time.milliseconds(), true, handler);
@@ -157,7 +160,7 @@ public class NetworkClientTest {
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
     }
 
-    protected void awaitReady(NetworkClient client, Node node) {
+    private void awaitReady(NetworkClient client, Node node) {
         if (client.discoverBrokerVersions()) {
             maybeSetExpectedApiVersionsResponse();
         }
@@ -169,8 +172,8 @@ public class NetworkClientTest {
     @Test
     public void testRequestTimeout() {
         awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
-        ProduceRequest.Builder builder =
-                new ProduceRequest.Builder((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
+        ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1,
+                1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
         long now = time.milliseconds();
         ClientRequest request = client.newClientRequest(

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index a244244..1f24e7a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -110,6 +110,36 @@ public class NodeApiVersionsTest {
         assertEquals(2, versions.usableVersion(ApiKeys.FETCH));
     }
 
+    @Test
+    public void testUsableVersionNoDesiredVersionReturnsLatestUsable() {
+        NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
+                new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
+        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, null));
+    }
+
+    @Test
+    public void testDesiredVersion() {
+        NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
+                new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
+        assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1));
+        assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2));
+        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3));
+    }
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void testDesiredVersionTooLarge() {
+        NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
+                new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 2)));
+        apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3);
+    }
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void testDesiredVersionTooSmall() {
+        NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
+                new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 2)));
+        apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0);
+    }
+
     @Test(expected = UnsupportedVersionException.class)
     public void testUsableVersionCalculationNoKnownVersions() {
         List<ApiVersion> versionList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index d3fc547..7fb3552 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1445,10 +1445,18 @@ public class KafkaConsumerTest {
             TopicPartition partition = fetchEntry.getKey();
             long fetchOffset = fetchEntry.getValue().offset;
             int fetchCount = fetchEntry.getValue().count;
-            MemoryRecordsBuilder records = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, fetchOffset);
-            for (int i = 0; i < fetchCount; i++)
-                records.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
-            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, records.build()));
+            final MemoryRecords records;
+            if (fetchCount == 0) {
+                records = MemoryRecords.EMPTY;
+            } else {
+                MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                        TimestampType.CREATE_TIME, fetchOffset);
+                for (int i = 0; i < fetchCount; i++)
+                    builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
+                records = builder.build();
+            }
+            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LSO,
+                    null, records));
         }
         return new FetchResponse(tpResponses, 0);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9cce874..8bf8a63 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -801,7 +801,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSSubscribe() {
-        unavailableTopicTest(true, false, Collections.<String>singleton("notmatching"));
+        unavailableTopicTest(true, false, Collections.singleton("notmatching"));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index de527f8..92150a6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -40,11 +40,13 @@ import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.LegacyRecord;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
@@ -56,6 +58,7 @@ import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -73,6 +76,7 @@ import java.util.List;
 import java.util.Map;
 
 import static java.util.Collections.singleton;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -157,6 +161,46 @@ public class FetcherTest {
     }
 
     @Test
+    public void testFetcherIgnoresControlRecords() {
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        builder.append(0L, "key".getBytes(), null);
+        builder.appendControlRecord(0L, ControlRecordType.COMMIT, null);
+        builder.append(0L, "key".getBytes(), null);
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 3L);
+        builder.appendControlRecord(0L, ControlRecordType.ABORT, null);
+        builder.close();
+
+        buffer.flip();
+
+        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        consumerClient.poll(0);
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp));
+
+        List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp);
+        assertEquals(2, records.size());
+
+        // TODO: currently the offset does not advance beyond the control record until a record
+        // with a larger offset is fetched. In the worst case, we may fetch the control record
+        // again after a rebalance, but that should be fine since we just discard it anyway
+        assertEquals(3L, subscriptions.position(tp).longValue());
+        for (ConsumerRecord<byte[], byte[]> record : records)
+            assertArrayEquals("key".getBytes(), record.key());
+    }
+
+    @Test
     public void testFetchError() {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
@@ -220,25 +264,25 @@ public class FetcherTest {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
 
-        byte magic = Record.CURRENT_MAGIC_VALUE;
+        byte magic = RecordBatch.MAGIC_VALUE_V1;
         byte[] key = "foo".getBytes();
         byte[] value = "baz".getBytes();
         long offset = 0;
         long timestamp = 500L;
 
-        int size = Record.recordSize(key, value);
-        byte attributes = Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
-        long crc = Record.computeChecksum(magic, attributes, timestamp, key, value);
+        int size = LegacyRecord.recordSize(key, value);
+        byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
+        long crc = LegacyRecord.computeChecksum(magic, attributes, timestamp, key, value);
 
         // write one valid record
         out.writeLong(offset);
         out.writeInt(size);
-        Record.write(out, magic, crc, Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME), timestamp, key, value);
+        LegacyRecord.write(out, magic, crc, LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME), timestamp, key, value);
 
         // and one invalid record (note the crc)
         out.writeLong(offset);
         out.writeInt(size);
-        Record.write(out, magic, crc + 1, Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME), timestamp, key, value);
+        LegacyRecord.write(out, magic, crc + 1, LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME), timestamp, key, value);
 
         buffer.flip();
 
@@ -259,6 +303,34 @@ public class FetcherTest {
     }
 
     @Test
+    public void testParseInvalidRecordBatch() throws Exception {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+                CompressionType.NONE, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+        ByteBuffer buffer = records.buffer();
+
+        // flip some bits to fail the crc
+        buffer.putInt(32, buffer.get(32) ^ 87238423);
+
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        consumerClient.poll(0);
+        try {
+            fetcher.fetchedRecords();
+            fail("fetchedRecords should have raised");
+        } catch (KafkaException e) {
+            // the position should not advance since no data has been returned
+            assertEquals(0, subscriptions.position(tp).longValue());
+        }
+    }
+
+    @Test
     public void testFetchMaxPollRecords() {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
 
@@ -298,7 +370,8 @@ public class FetcherTest {
         // if we are fetching from a compacted topic, there may be gaps in the returned records
         // this test verifies the fetcher updates the current fetched/consumed positions correctly for this case
 
-        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
         builder.appendWithOffset(15L, 0L, "key".getBytes(), "value-1".getBytes());
         builder.appendWithOffset(20L, 0L, "key".getBytes(), "value-2".getBytes());
         builder.appendWithOffset(30L, 0L, "key".getBytes(), "value-3".getBytes());
@@ -741,9 +814,10 @@ public class FetcherTest {
         for (int i = 1; i < 4; i++) {
             // We need to make sure the message offset grows. Otherwise they will be considered as already consumed
             // and filtered out by consumer.
-            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                    TimestampType.CREATE_TIME, 0L);
             for (int v = 0; v < 3; v++)
-                builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
+                builder.appendWithOffset((long) i * 3 + v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
             List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE, 100L, 100 * i).get(tp);
             assertEquals(3, records.size());
         }
@@ -780,9 +854,10 @@ public class FetcherTest {
         assertEquals(100, partitionLag.value(), EPSILON);
 
         // recordsFetchLagMax should be hw - offset of the last message after receiving a non-empty FetchResponse
-        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
         for (int v = 0; v < 3; v++)
-            builder.appendWithOffset((long) v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
+            builder.appendWithOffset((long) v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
         fetchRecords(builder.build(), Errors.NONE, 200L, 0);
         assertEquals(197, recordsFetchLagMax.value(), EPSILON);
 
@@ -890,9 +965,9 @@ public class FetcherTest {
     }
 
     private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) {
-        return new FetchResponse(
-                new LinkedHashMap<>(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, records))),
-                throttleTime);
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
+                new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LSO, null, records));
+        return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
     }
 
     private MetadataResponse newMetadataResponse(String topic, Errors error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index 9eab418..90ff16a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.junit.Test;
 
 public class RecordSendTest {
@@ -46,7 +46,7 @@ public class RecordSendTest {
     public void testTimeout() throws Exception {
         ProduceRequestResult request = new ProduceRequestResult(topicPartition);
         FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset,
-                                                               Record.NO_TIMESTAMP, 0, 0, 0);
+                                                               RecordBatch.NO_TIMESTAMP, 0, 0, 0);
         assertFalse("Request is not completed", future.isDone());
         try {
             future.get(5, TimeUnit.MILLISECONDS);
@@ -54,7 +54,7 @@ public class RecordSendTest {
         } catch (TimeoutException e) { /* this is good */
         }
 
-        request.set(baseOffset, Record.NO_TIMESTAMP, null);
+        request.set(baseOffset, RecordBatch.NO_TIMESTAMP, null);
         request.done();
         assertTrue(future.isDone());
         assertEquals(baseOffset + relOffset, future.get().offset());
@@ -66,7 +66,7 @@ public class RecordSendTest {
     @Test(expected = ExecutionException.class)
     public void testError() throws Exception {
         FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L),
-                                                               relOffset, Record.NO_TIMESTAMP, 0, 0, 0);
+                                                               relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, 0);
         future.get();
     }
 
@@ -76,7 +76,7 @@ public class RecordSendTest {
     @Test
     public void testBlocking() throws Exception {
         FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L),
-                                                               relOffset, Record.NO_TIMESTAMP, 0, 0, 0);
+                                                               relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, 0);
         assertEquals(baseOffset + relOffset, future.get().offset());
     }
 
@@ -87,7 +87,7 @@ public class RecordSendTest {
             public void run() {
                 try {
                     sleep(timeout);
-                    request.set(baseOffset, Record.NO_TIMESTAMP, error);
+                    request.set(baseOffset, RecordBatch.NO_TIMESTAMP, error);
                     request.done();
                 } catch (InterruptedException e) { }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 3258ba3..6895fce 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
@@ -31,8 +31,8 @@ public class ProducerBatchTest {
 
     private final long now = 1488748346917L;
 
-    private final MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(ByteBuffer.allocate(0),
-            Record.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, 0);
+    private final MemoryRecordsBuilder memoryRecordsBuilder = MemoryRecords.builder(ByteBuffer.allocate(128),
+            CompressionType.NONE, TimestampType.CREATE_TIME, 128);
 
     /**
      * A {@link ProducerBatch} configured using a very large linger value and a timestamp preceding its create

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 42dc4c4..9cc863b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
@@ -25,9 +26,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.LogEntry;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.DefaultRecord;
 import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.junit.After;
@@ -69,7 +70,7 @@ public class RecordAccumulatorTest {
     private MockTime time = new MockTime();
     private byte[] key = "key".getBytes();
     private byte[] value = "value".getBytes();
-    private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
+    private int msgSize = DefaultRecord.sizeInBytes(0, 0, key, value);
     private Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3),
             Collections.<String>emptySet(), Collections.<String>emptySet());
     private Metrics metrics = new Metrics(time);
@@ -83,19 +84,26 @@ public class RecordAccumulatorTest {
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();
-        int batchSize = 1024;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10L * batchSize, CompressionType.NONE, 10L, 100L, metrics, time);
-        int appends = batchSize / msgSize;
+
+        // test case assumes that the records do not fill the batch completely
+        int batchSize = 1025;
+
+        RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize,
+                CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions());
+        int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             // append to the first batch
             accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
             Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
             assertEquals(1, partitionBatches.size());
-            assertTrue(partitionBatches.peekFirst().isWritable());
+
+            ProducerBatch batch = partitionBatches.peekFirst();
+            assertTrue(batch.isWritable());
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
 
         // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
+
         accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
         Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
         assertEquals(2, partitionBatches.size());
@@ -108,11 +116,11 @@ public class RecordAccumulatorTest {
         assertEquals(1, batches.size());
         ProducerBatch batch = batches.get(0);
 
-        Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
+        Iterator<Record> iter = batch.records().records().iterator();
         for (int i = 0; i < appends; i++) {
-            LogEntry entry = iter.next();
-            assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
-            assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
+            Record record = iter.next();
+            assertEquals("Keys should match", ByteBuffer.wrap(key), record.key());
+            assertEquals("Values should match", ByteBuffer.wrap(value), record.value());
         }
         assertFalse("No more records", iter.hasNext());
     }
@@ -120,7 +128,8 @@ public class RecordAccumulatorTest {
     @Test
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
+                CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions());
         accum.append(tp1, 0L, key, new byte[2 * batchSize], null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
     }
@@ -128,7 +137,8 @@ public class RecordAccumulatorTest {
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
         accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
@@ -137,16 +147,17 @@ public class RecordAccumulatorTest {
         assertEquals(1, batches.size());
         ProducerBatch batch = batches.get(0);
 
-        Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
-        LogEntry entry = iter.next();
-        assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
-        assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
+        Iterator<Record> iter = batch.records().records().iterator();
+        Record record = iter.next();
+        assertEquals("Keys should match", ByteBuffer.wrap(key), record.key());
+        assertEquals("Values should match", ByteBuffer.wrap(value), record.value());
         assertFalse("No more records", iter.hasNext());
     }
 
     @Test
     public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
+                CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions());
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
@@ -165,7 +176,8 @@ public class RecordAccumulatorTest {
         final int numThreads = 5;
         final int msgs = 10000;
         final int numParts = 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
+        final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
+                CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions());
         List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
@@ -189,7 +201,7 @@ public class RecordAccumulatorTest {
             List<ProducerBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
             if (batches != null) {
                 for (ProducerBatch batch : batches) {
-                    for (LogEntry entry : batch.records().deepEntries())
+                    for (Record record : batch.records().records())
                         read++;
                     accum.deallocate(batch);
                 }
@@ -205,9 +217,14 @@ public class RecordAccumulatorTest {
     public void testNextReadyCheckDelay() throws Exception {
         // Next check time will use lingerMs since this test won't trigger any retries/backoff
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  CompressionType.NONE, lingerMs, 100L, metrics, time);
+
+        // test case assumes that the records do not fill the batch completely
+        int batchSize = 1025;
+
+        RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
         // Just short of going over the limit so we trigger linger time
-        int appends = 1024 / msgSize;
+        int appends = expectedNumAppends(batchSize);
 
         // Partition on node1 only
         for (int i = 0; i < appends; i++)
@@ -239,7 +256,8 @@ public class RecordAccumulatorTest {
     public void testRetryBackoff() throws Exception {
         long lingerMs = Long.MAX_VALUE / 4;
         long retryBackoffMs = Long.MAX_VALUE / 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time);
+        final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
+                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
 
         long now = time.milliseconds();
         accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
@@ -276,7 +294,8 @@ public class RecordAccumulatorTest {
     @Test
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
         for (int i = 0; i < 100; i++)
             accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
@@ -309,7 +328,8 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testAwaitFlushComplete() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
+                CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions());
         accum.append(new TopicPartition(topic, 0), 0L, key, value, null, maxBlockTimeMs);
 
         accum.beginFlush();
@@ -328,7 +348,8 @@ public class RecordAccumulatorTest {
     public void testAbortIncompleteBatches() throws Exception {
         long lingerMs = Long.MAX_VALUE;
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
         class TestCallback implements Callback {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -353,8 +374,12 @@ public class RecordAccumulatorTest {
         long lingerMs = 3000L;
         int requestTimeout = 60;
 
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time);
-        int appends = 1024 / msgSize;
+        // test case assumes that the records do not fill the batch completely
+        int batchSize = 1025;
+
+        RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
+                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
+        int appends = expectedNumAppends(batchSize);
 
         // Test batches not in retry
         for (int i = 0; i < appends; i++) {
@@ -421,12 +446,12 @@ public class RecordAccumulatorTest {
         long retryBackoffMs = 100L;
         long lingerMs = 3000L;
         int requestTimeout = 60;
-        int messagesPerBatch = 1024 / msgSize;
+        int messagesPerBatch = expectedNumAppends(1024);
 
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs,
-                retryBackoffMs, metrics, time);
+        final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
+                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
         final AtomicInteger expiryCallbackCount = new AtomicInteger();
-        final AtomicReference<Exception> unexpectedException = new AtomicReference<Exception>();
+        final AtomicReference<Exception> unexpectedException = new AtomicReference<>();
         Callback callback = new Callback() {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -461,8 +486,12 @@ public class RecordAccumulatorTest {
     @Test
     public void testMutedPartitions() throws InterruptedException {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time);
-        int appends = 1024 / msgSize;
+        // test case assumes that the records do not fill the batch completely
+        int batchSize = 1025;
+
+        RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
+                CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions());
+        int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
@@ -489,4 +518,19 @@ public class RecordAccumulatorTest {
         drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
         assertTrue("The batch should have been drained.", drained.get(node1.id()).size() > 0);
     }
+
+    /**
+     * Return the offset delta.
+     */
+    private int expectedNumAppends(int batchSize) {
+        int size = 0;
+        int offsetDelta = 0;
+        while (true) {
+            int recordSize = DefaultRecord.sizeInBytes(offsetDelta, 0, key, value);
+            if (size + recordSize > batchSize)
+                return offsetDelta;
+            offsetDelta += 1;
+            size += recordSize;
+        }
+    }
 }


Mime
View raw message