kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [4/5] kafka git commit: KAFKA-3025; Added timetamp to Message and use relative offset.
Date Fri, 19 Feb 2016 15:56:51 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index fa06be9..3ef5c8b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -42,8 +43,8 @@ public class MockConsumerTest {
         beginningOffsets.put(new TopicPartition("test", 1), 0L);
         consumer.updateBeginningOffsets(beginningOffsets);
         consumer.seek(new TopicPartition("test", 0), 0);
-        ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
-        ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");
+        ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, "key1", "value1");
+        ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, "key2", "value2");
         consumer.addRecord(rec1);
         consumer.addRecord(rec2);
         ConsumerRecords<String, String> recs = consumer.poll(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
index 45210a8..25843c7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -42,7 +43,8 @@ public class ConsumerInterceptorsTest {
     private final TopicPartition tp = new TopicPartition(topic, partition);
     private final TopicPartition filterTopicPart1 = new TopicPartition("test5", filterPartition1);
     private final TopicPartition filterTopicPart2 = new TopicPartition("test6", filterPartition2);
-    private final ConsumerRecord<Integer, Integer> consumerRecord = new ConsumerRecord<>(topic, partition, 0, 1, 1);
+    private final ConsumerRecord<Integer, Integer> consumerRecord =
+        new ConsumerRecord<>(topic, partition, 0, 0L, TimestampType.CREATE_TIME, 1, 1);
     private int onCommitCount = 0;
     private int onConsumeCount = 0;
 
@@ -115,9 +117,9 @@ public class ConsumerInterceptorsTest {
         List<ConsumerRecord<Integer, Integer>> list1 = new ArrayList<>();
         list1.add(consumerRecord);
         List<ConsumerRecord<Integer, Integer>> list2 = new ArrayList<>();
-        list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0, 1, 1));
+        list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0, 0L, TimestampType.CREATE_TIME, 1, 1));
         List<ConsumerRecord<Integer, Integer>> list3 = new ArrayList<>();
-        list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0, 1, 1));
+        list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0, 0L, TimestampType.CREATE_TIME, 1, 1));
         records.put(tp, list1);
         records.put(filterTopicPart1, list2);
         records.put(filterTopicPart2, list3);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5e750fd..97c3d85 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
@@ -95,9 +96,9 @@ public class FetcherTest {
         metadata.update(cluster, time.milliseconds());
         client.setNode(node);
 
-        records.append(1L, "key".getBytes(), "value-1".getBytes());
-        records.append(2L, "key".getBytes(), "value-2".getBytes());
-        records.append(3L, "key".getBytes(), "value-3".getBytes());
+        records.append(1L, 0L, "key".getBytes(), "value-1".getBytes());
+        records.append(2L, 0L, "key".getBytes(), "value-2".getBytes());
+        records.append(3L, 0L, "key".getBytes(), "value-3".getBytes());
         records.close();
     }
 
@@ -133,9 +134,9 @@ public class FetcherTest {
         // this test verifies the fetcher updates the current fetched/consumed positions correctly for this case
 
         MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
-        records.append(15L, "key".getBytes(), "value-1".getBytes());
-        records.append(20L, "key".getBytes(), "value-2".getBytes());
-        records.append(30L, "key".getBytes(), "value-3".getBytes());
+        records.append(15L, 0L, "key".getBytes(), "value-1".getBytes());
+        records.append(20L, 0L, "key".getBytes(), "value-2".getBytes());
+        records.append(30L, 0L, "key".getBytes(), "value-3".getBytes());
         records.close();
 
         List<ConsumerRecord<byte[], byte[]>> consumerRecords;
@@ -164,7 +165,7 @@ public class FetcherTest {
         MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
         byte[] bytes = new byte[this.fetchSize];
         new Random().nextBytes(bytes);
-        records.append(1L, null, bytes);
+        records.append(1L, 0L, null, bytes);
         records.close();
 
         // resize the limit of the buffer to pretend it is only fetch-size large
@@ -469,7 +470,7 @@ public class FetcherTest {
             if (i > 1) {
                 this.records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
                 for (int v = 0; v < 3; v++) {
-                    this.records.append((long) i * 3 + v, "key".getBytes(), String.format("value-%d", v).getBytes());
+                    this.records.append((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
                 }
                 this.records.close();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index 7bb181e..f3db098 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -45,7 +45,7 @@ public class ProducerRecordTest {
         ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<String, Integer>("test", 1 , "key", 2);
         assertFalse(producerRecord.equals(valueMisMatch));
 
-        ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<String, Integer>("topic", null, null, null);
+        ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<String, Integer>("topic", null, null, null, null);
         assertEquals(nullFieldsRecord, nullFieldsRecord);
         assertEquals(nullFieldsRecord.hashCode(), nullFieldsRecord.hashCode());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index 1e5d1c2..5591129 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.record.Record;
 import org.junit.Test;
 
 public class RecordSendTest {
@@ -44,7 +45,7 @@ public class RecordSendTest {
     @Test
     public void testTimeout() throws Exception {
         ProduceRequestResult request = new ProduceRequestResult();
-        FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset);
+        FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset, Record.NO_TIMESTAMP);
         assertFalse("Request is not completed", future.isDone());
         try {
             future.get(5, TimeUnit.MILLISECONDS);
@@ -62,7 +63,7 @@ public class RecordSendTest {
      */
     @Test(expected = ExecutionException.class)
     public void testError() throws Exception {
-        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), relOffset);
+        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), relOffset, Record.NO_TIMESTAMP);
         future.get();
     }
 
@@ -71,7 +72,7 @@ public class RecordSendTest {
      */
     @Test
     public void testBlocking() throws Exception {
-        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), relOffset);
+        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), relOffset, Record.NO_TIMESTAMP);
         assertEquals(baseOffset + relOffset, future.get().offset());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
index 18a455f..26d15d0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -128,7 +128,7 @@ public class ProducerInterceptorsTest {
         ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
 
         // verify onAck is called on all interceptors
-        RecordMetadata meta = new RecordMetadata(tp, 0, 0);
+        RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0);
         interceptors.onAcknowledgement(meta, null);
         assertEquals(2, onAckCount);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 723e450..0f95ee5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -77,10 +77,10 @@ public class RecordAccumulatorTest {
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, key, value, null, maxBlockTimeMs);
+            accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
-        accum.append(tp1, key, value, null, maxBlockTimeMs);
+        accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
@@ -99,7 +99,7 @@ public class RecordAccumulatorTest {
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
         RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
-        accum.append(tp1, key, new byte[2 * batchSize], null, maxBlockTimeMs);
+        accum.append(tp1, 0L, key, new byte[2 * batchSize], null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
     }
 
@@ -107,7 +107,7 @@ public class RecordAccumulatorTest {
     public void testLinger() throws Exception {
         long lingerMs = 10L;
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
-        accum.append(tp1, key, value, null, maxBlockTimeMs);
+        accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -129,7 +129,7 @@ public class RecordAccumulatorTest {
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
             for (int i = 0; i < appends; i++)
-                accum.append(tp, key, value, null, maxBlockTimeMs);
+                accum.append(tp, 0L, key, value, null, maxBlockTimeMs);
         }
         assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
@@ -150,7 +150,7 @@ public class RecordAccumulatorTest {
                 public void run() {
                     for (int i = 0; i < msgs; i++) {
                         try {
-                            accum.append(new TopicPartition(topic, i % numParts), key, value, null, maxBlockTimeMs);
+                            accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, null, maxBlockTimeMs);
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -189,7 +189,7 @@ public class RecordAccumulatorTest {
 
         // Partition on node1 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp1, key, value, null, maxBlockTimeMs);
+            accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
@@ -198,14 +198,14 @@ public class RecordAccumulatorTest {
 
         // Add partition on node2 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp3, key, value, null, maxBlockTimeMs);
+            accum.append(tp3, 0L, key, value, null, maxBlockTimeMs);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
 
         // Add data for another partition on node1, enough to make data sendable immediately
         for (int i = 0; i < appends + 1; i++)
-            accum.append(tp2, key, value, null, maxBlockTimeMs);
+            accum.append(tp2, 0L, key, value, null, maxBlockTimeMs);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         // Note this can actually be < linger time because it may use delays from partitions that aren't sendable
@@ -220,7 +220,7 @@ public class RecordAccumulatorTest {
         final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time);
 
         long now = time.milliseconds();
-        accum.append(tp1, key, value, null, maxBlockTimeMs);
+        accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         Map<Integer, List<RecordBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
@@ -232,7 +232,7 @@ public class RecordAccumulatorTest {
         accum.reenqueue(batches.get(0).get(0), now);
 
         // Put message for partition 1 into accumulator
-        accum.append(tp2, key, value, null, maxBlockTimeMs);
+        accum.append(tp2, 0L, key, value, null, maxBlockTimeMs);
         result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
 
@@ -256,7 +256,7 @@ public class RecordAccumulatorTest {
         long lingerMs = Long.MAX_VALUE;
         final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), key, value, null, maxBlockTimeMs);
+            accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         
@@ -287,7 +287,7 @@ public class RecordAccumulatorTest {
             }
         }
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback(), maxBlockTimeMs);
+            accum.append(new TopicPartition(topic, i % 3), 0L, key, value, new TestCallback(), maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
 
@@ -304,12 +304,12 @@ public class RecordAccumulatorTest {
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, key, value, null, maxBlockTimeMs);
+            accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
         time.sleep(2000);
         accum.ready(cluster, now);
-        accum.append(tp1, key, value, null, 0);
+        accum.append(tp1, 0L, key, value, null, 0);
         Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
         Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>(), Collections.<String>emptySet());

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 14a839b..b983de5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
@@ -93,7 +94,7 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@@ -112,7 +113,7 @@ public class SenderTest {
     public void testQuotaMetrics() throws Exception {
         final long offset = 0;
         for (int i = 1; i <= 3; i++) {
-            Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // send produce request
             client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i));
             sender.run(time.milliseconds());
@@ -141,7 +142,7 @@ public class SenderTest {
                                        "clientId",
                                        REQUEST_TIMEOUT);
             // do a successful retry
-            Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
             String id = client.requests().peek().request().destination();
@@ -162,7 +163,7 @@ public class SenderTest {
             assertEquals(offset, future.get().offset());
 
             // do an unsuccessful retry
-            future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // send produce request
             for (int i = 0; i < maxRetries + 1; i++) {
                 client.disconnect(client.requests().peek().request().destination());
@@ -188,7 +189,7 @@ public class SenderTest {
     }
 
     private Struct produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) {
-        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset);
+        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset, Record.NO_TIMESTAMP);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
         ProduceResponse response = new ProduceResponse(partResp, throttleTimeMs);
         return response.toStruct();

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 6e3a9ac..ed64f63 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -45,13 +45,13 @@ public class MemoryRecordsTest {
     public void testIterator() {
         MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
         MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
-        List<Record> list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()),
-                                          new Record("b".getBytes(), "2".getBytes()),
-                                          new Record("c".getBytes(), "3".getBytes()));
+        List<Record> list = Arrays.asList(new Record(0L, "a".getBytes(), "1".getBytes()),
+                                          new Record(0L, "b".getBytes(), "2".getBytes()),
+                                          new Record(0L, "c".getBytes(), "3".getBytes()));
         for (int i = 0; i < list.size(); i++) {
             Record r = list.get(i);
             recs1.append(i, r);
-            recs2.append(i, toArray(r.key()), toArray(r.value()));
+            recs2.append(i, 0L, toArray(r.key()), toArray(r.value()));
         }
         recs1.close();
         recs2.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
index 957fc8f..6482529 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
@@ -35,16 +35,18 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(value = Parameterized.class)
 public class RecordTest {
 
+    private long timestamp;
     private ByteBuffer key;
     private ByteBuffer value;
     private CompressionType compression;
     private Record record;
 
-    public RecordTest(byte[] key, byte[] value, CompressionType compression) {
+    public RecordTest(long timestamp, byte[] key, byte[] value, CompressionType compression) {
+        this.timestamp = timestamp;
         this.key = key == null ? null : ByteBuffer.wrap(key);
         this.value = value == null ? null : ByteBuffer.wrap(value);
         this.compression = compression;
-        this.record = new Record(key, value, compression);
+        this.record = new Record(timestamp, key, value, compression);
     }
 
     @Test
@@ -64,6 +66,7 @@ public class RecordTest {
     public void testChecksum() {
         assertEquals(record.checksum(), record.computeChecksum());
         assertEquals(record.checksum(), Record.computeChecksum(
+            this.timestamp,
             this.key == null ? null : this.key.array(),
             this.value == null ? null : this.value.array(),
             this.compression, 0, -1));
@@ -99,10 +102,11 @@ public class RecordTest {
         byte[] payload = new byte[1000];
         Arrays.fill(payload, (byte) 1);
         List<Object[]> values = new ArrayList<Object[]>();
-        for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
-            for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
-                for (CompressionType compression : CompressionType.values())
-                    values.add(new Object[] {key, value, compression});
+        for (long timestamp : Arrays.asList(Record.NO_TIMESTAMP, 0L, 1L))
+            for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
+                for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
+                    for (CompressionType compression : CompressionType.values())
+                        values.add(new Object[] {timestamp, key, value, compression});
         return values;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5fc5551..edeaf63 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.record.Record;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -82,7 +83,7 @@ public class RequestResponseTest {
                 createOffsetFetchRequest().getErrorResponse(0, new UnknownServerException()),
                 createOffsetFetchResponse(),
                 createProduceRequest(),
-                createProduceRequest().getErrorResponse(1, new UnknownServerException()),
+                createProduceRequest().getErrorResponse(2, new UnknownServerException()),
                 createProduceResponse(),
                 createStopReplicaRequest(),
                 createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()),
@@ -122,16 +123,19 @@ public class RequestResponseTest {
     @Test
     public void produceResponseVersionTest() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
-        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000));
-
+        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
         ProduceResponse v0Response = new ProduceResponse(responseData);
-        ProduceResponse v1Response = new ProduceResponse(responseData, 10);
+        ProduceResponse v1Response = new ProduceResponse(responseData, 10, 1);
+        ProduceResponse v2Response = new ProduceResponse(responseData, 10, 2);
         assertEquals("Throttle time must be zero", 0, v0Response.getThrottleTime());
         assertEquals("Throttle time must be 10", 10, v1Response.getThrottleTime());
+        assertEquals("Throttle time must be 10", 10, v2Response.getThrottleTime());
         assertEquals("Should use schema version 0", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0), v0Response.toStruct().schema());
         assertEquals("Should use schema version 1", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 1), v1Response.toStruct().schema());
+        assertEquals("Should use schema version 2", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 2), v2Response.toStruct().schema());
         assertEquals("Response data does not match", responseData, v0Response.responses());
         assertEquals("Response data does not match", responseData, v1Response.responses());
+        assertEquals("Response data does not match", responseData, v2Response.responses());
     }
 
     @Test
@@ -316,7 +320,7 @@ public class RequestResponseTest {
 
     private AbstractRequestResponse createProduceResponse() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
-        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000));
+        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
         return new ProduceResponse(responseData, 0);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
index 8295b54..3246578 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.TimestampType;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -55,7 +56,8 @@ public class MockConsumerInterceptor implements ConsumerInterceptor<String, Stri
         for (TopicPartition tp : records.partitions()) {
             List<ConsumerRecord<String, String>> lst = new ArrayList<>();
             for (ConsumerRecord<String, String> record: records.records(tp)) {
-                lst.add(new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase()));
+                lst.add(new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
+                    0L, TimestampType.CREATE_TIME, record.key(), record.value().toUpperCase()));
             }
             recordMap.put(tp, lst);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 04b08b3..978e3a1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -290,7 +291,7 @@ public class WorkerSinkTaskTest {
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                         List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
                         for (int i = 0; i < numMessages; i++)
-                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, RAW_KEY, RAW_VALUE));
+                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE));
                         recordsReturned += numMessages;
                         return new ConsumerRecords<>(
                                 numMessages > 0 ?

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 3bf653e..e202209 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -519,7 +520,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                                 Collections.singletonMap(
                                         new TopicPartition(TOPIC, PARTITION),
                                         Arrays.asList(
-                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
+                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE)
                                         )));
                         recordsReturned++;
                         return records;
@@ -547,7 +548,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                                 Collections.singletonMap(
                                         new TopicPartition(TOPIC, PARTITION),
                                         Arrays.asList(
-                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
+                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE)
                                         )));
                         recordsReturned++;
                         return records;

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 59e9b86..c6eb0c5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -356,7 +356,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
             public Future<RecordMetadata> answer() throws Throwable {
                 synchronized (producerCallbacks) {
                     for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) {
-                        cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0), null);
+                        cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L), null);
                     }
                     producerCallbacks.reset();
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
index e007f02..e878e12 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -288,14 +289,14 @@ public class KafkaConfigStorageTest {
         expectConfigure();
         // Overwrite each type at least once to ensure we see the latest data after loading
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 3, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
-                new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
                 // Connector after root update should make it through, task update shouldn't
-                new ConsumerRecord<>(TOPIC, 0, 5, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
-                new ConsumerRecord<>(TOPIC, 0, 6, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
+                new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -342,12 +343,12 @@ public class KafkaConfigStorageTest {
 
         expectConfigure();
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
                 // This is the record that has been compacted:
                 //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
-                new ConsumerRecord<>(TOPIC, 0, 5, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -483,7 +484,7 @@ public class KafkaConfigStorageTest {
                     public Future<Void> answer() throws Throwable {
                         TestFuture<Void> future = new TestFuture<Void>();
                         for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
-                            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, entry.getKey(), entry.getValue()));
+                            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, entry.getKey(), entry.getValue()));
                         future.resolveOnGet((Void) null);
                         return future;
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index 4e54bf1..61763a8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.KafkaBasedLog;
@@ -125,10 +126,10 @@ public class KafkaOffsetBackingStoreTest {
     public void testReloadOnStart() throws Exception {
         expectConfigure();
         expectStart(Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
-                new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()),
-                new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()),
-                new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array())
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE.array()),
+                new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE.array()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE_NEW.array()),
+                new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE_NEW.array())
         ));
         expectStop();
 
@@ -176,8 +177,8 @@ public class KafkaOffsetBackingStoreTest {
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override
             public Object answer() throws Throwable {
-                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
-                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE.array()));
                 secondGetReadToEndCallback.getValue().onCompletion(null, null);
                 return null;
             }
@@ -189,8 +190,8 @@ public class KafkaOffsetBackingStoreTest {
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override
             public Object answer() throws Throwable {
-                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()));
-                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE_NEW.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE_NEW.array()));
                 thirdGetReadToEndCallback.getValue().onCompletion(null, null);
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index ab370e3..b2246f5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -182,7 +183,7 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE));
                     }
                 });
                 consumer.scheduleNopPollTask();
@@ -190,7 +191,7 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE));
                     }
                 });
                 consumer.schedulePollTask(new Runnable() {
@@ -297,16 +298,16 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY, TP0_VALUE_NEW));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE));
                     }
                 });
 
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY, TP1_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE_NEW));
                     }
                 });
 
@@ -362,8 +363,8 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE_NEW));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW));
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 82a6612..276689a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -17,18 +17,18 @@
 
 package kafka.admin
 
-import joptsimple._
 import java.util.Properties
+
+import joptsimple._
 import kafka.admin.TopicCommand._
-import kafka.consumer.ConsumerConfig
 import kafka.log.{Defaults, LogConfig}
 import kafka.server.{ClientConfigOverride, ConfigType}
-import kafka.utils.{ZkUtils, CommandLineUtils}
-import org.I0Itec.zkclient.ZkClient
-import scala.collection._
-import scala.collection.JavaConversions._
-import org.apache.kafka.common.utils.Utils
+import kafka.utils.{CommandLineUtils, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.JavaConversions._
+import scala.collection._
 
 
 /**
@@ -117,6 +117,10 @@ object ConfigCommand {
             "Invalid entity config: all configs to be added must be in the format \"key=val\".")
     val props = new Properties
     configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
+    if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
+      println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
+        s"This configuration will be ignored if the value is on a version newer than the specified inter.broker.protocol.version in the broker.")
+    }
     props
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index c17b5bc..d4212c5 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -17,21 +17,22 @@
 
 package kafka.admin
 
-import joptsimple._
 import java.util.Properties
-import kafka.common.{TopicExistsException, Topic, AdminCommandFailedException}
-import kafka.utils.CommandLineUtils
-import kafka.utils._
-import kafka.utils.ZkUtils._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import scala.collection._
-import scala.collection.JavaConversions._
-import kafka.log.{Defaults, LogConfig}
+
+import joptsimple._
+import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException}
 import kafka.consumer.{ConsumerConfig, Whitelist}
+import kafka.coordinator.GroupCoordinator
+import kafka.log.{Defaults, LogConfig}
 import kafka.server.ConfigType
-import org.apache.kafka.common.utils.Utils
+import kafka.utils.ZkUtils._
+import kafka.utils._
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.kafka.common.security.JaasUtils
-import kafka.coordinator.GroupCoordinator
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.JavaConversions._
+import scala.collection._
 
 
 object TopicCommand extends Logging {
@@ -235,6 +236,10 @@ object TopicCommand extends Logging {
     val props = new Properties
     configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
     LogConfig.validate(props)
+    if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
+      println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
+      s"This configuration will be ignored if the value is on a version newer than the specified inter.broker.protocol.version in the broker.")
+    }
     props
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index c9c1976..6b5fb7c 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,6 +17,8 @@
 
 package kafka.api
 
+import kafka.message.Message
+
 /**
  * This class contains the different Kafka versions.
  * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves.
@@ -24,16 +26,29 @@ package kafka.api
  *
  * Note that the ID we initialize for each version is important.
  * We consider a version newer than another, if it has a higher ID (to avoid depending on lexicographic order)
+ * 
+ * Since the api protocol may change more than once within the same release and to facilitate people deploying code from
+ * trunk, we have the concept of internal versions (first introduced during the 0.10.0 development cycle). For example,
+ * the first time we introduce a version change in a release, say 0.10.0, we will add a config value "0.10.0-IV0" and a
+ * corresponding case object KAFKA_0_10_0-IV0. We will also add a config value "0.10.0" that will be mapped to the
+ * latest internal version object, which is KAFKA_0_10_0-IV0. When we change the protocol a second time while developing
+ * 0.10.0, we will add a new config value "0.10.0-IV1" and a corresponding case object KAFKA_0_10_0-IV1. We will change
+ * the config value "0.10.0" to map to the latest internal version object KAFKA_0_10_0-IV1. The config value of
+ * "0.10.0-IV0" is still mapped to KAFKA_0_10_0-IV0. This way, if people are deploying from trunk, they can use
+ * "0.10.0-IV0" and "0.10.0-IV1" to upgrade one internal version at a time. For most people who just want to use
+ * released version, they can use "0.10.0" when upgrading to the 0.10.0 release.
  */
 object ApiVersion {
   // This implicit is necessary due to: https://issues.scala-lang.org/browse/SI-8541
   implicit def orderingByVersion[A <: ApiVersion]: Ordering[A] = Ordering.by(_.id)
 
   private val versionNameMap = Map(
-    "0.8.0" -> KAFKA_080,
-    "0.8.1" -> KAFKA_081,
-    "0.8.2" -> KAFKA_082,
-    "0.9.0" -> KAFKA_090
+    "0.8.0" -> KAFKA_0_8_0,
+    "0.8.1" -> KAFKA_0_8_1,
+    "0.8.2" -> KAFKA_0_8_2,
+    "0.9.0" -> KAFKA_0_9_0,
+    "0.10.0-IV0" -> KAFKA_0_10_0_IV0,
+    "0.10.0" -> KAFKA_0_10_0_IV0
   )
 
   def apply(version: String): ApiVersion  = versionNameMap(version.split("\\.").slice(0,3).mkString("."))
@@ -43,6 +58,7 @@ object ApiVersion {
 
 sealed trait ApiVersion extends Ordered[ApiVersion] {
   val version: String
+  val messageFormatVersion: Byte
   val id: Int
 
   override def compare(that: ApiVersion): Int = {
@@ -57,22 +73,32 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
 }
 
 // Keep the IDs in order of versions
-case object KAFKA_080 extends ApiVersion {
+case object KAFKA_0_8_0 extends ApiVersion {
   val version: String = "0.8.0.X"
+  val messageFormatVersion: Byte = Message.MagicValue_V0
   val id: Int = 0
 }
 
-case object KAFKA_081 extends ApiVersion {
+case object KAFKA_0_8_1 extends ApiVersion {
   val version: String = "0.8.1.X"
+  val messageFormatVersion: Byte = Message.MagicValue_V0
   val id: Int = 1
 }
 
-case object KAFKA_082 extends ApiVersion {
+case object KAFKA_0_8_2 extends ApiVersion {
   val version: String = "0.8.2.X"
+  val messageFormatVersion: Byte = Message.MagicValue_V0
   val id: Int = 2
 }
 
-case object KAFKA_090 extends ApiVersion {
+case object KAFKA_0_9_0 extends ApiVersion {
   val version: String = "0.9.0.X"
+  val messageFormatVersion: Byte = Message.MagicValue_V0
   val id: Int = 3
 }
+
+case object KAFKA_0_10_0_IV0 extends ApiVersion {
+  val version: String = "0.10.0-IV0"
+  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val id: Int = 4
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index b43b8f4..f47942c 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -33,7 +33,7 @@ import scala.collection.immutable.Map
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
 
 object FetchRequest {
-  val CurrentVersion = 1.shortValue
+  val CurrentVersion = 2.shortValue
   val DefaultMaxWait = 0
   val DefaultMinBytes = 0
   val DefaultCorrelationId = 0
@@ -151,6 +151,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
         (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
     }
     val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
+    // Magic value does not matter here because the message set is empty
     requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 11f5009..30af841 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -27,7 +27,7 @@ import kafka.network.RequestChannel.Response
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
 object ProducerRequest {
-  val CurrentVersion = 1.shortValue
+  val CurrentVersion = 2.shortValue
 
   def readFrom(buffer: ByteBuffer): ProducerRequest = {
     val versionId: Short = buffer.getShort
@@ -135,7 +135,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
     else {
       val producerResponseStatus = data.map {
         case (topicAndPartition, data) =>
-          (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l))
+          (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp))
       }
       val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 7e745cf..89d7538 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -18,6 +18,7 @@
 package kafka.api
 
 import java.nio.ByteBuffer
+import kafka.message.Message
 import org.apache.kafka.common.protocol.Errors
 
 import scala.collection.Map
@@ -25,7 +26,7 @@ import kafka.common.TopicAndPartition
 import kafka.api.ApiUtils._
 
 object ProducerResponse {
-  // readFrom assumes that the response is written using V1 format
+  // readFrom assumes that the response is written using V2 format
   def readFrom(buffer: ByteBuffer): ProducerResponse = {
     val correlationId = buffer.getInt
     val topicCount = buffer.getInt
@@ -36,7 +37,8 @@ object ProducerResponse {
         val partition = buffer.getInt
         val error = buffer.getShort
         val offset = buffer.getLong
-        (TopicAndPartition(topic, partition), ProducerResponseStatus(error, offset))
+        val timestamp = buffer.getLong
+        (TopicAndPartition(topic, partition), ProducerResponseStatus(error, offset, timestamp))
       })
     })
 
@@ -45,7 +47,7 @@ object ProducerResponse {
   }
 }
 
-case class ProducerResponseStatus(var error: Short, offset: Long)
+case class ProducerResponseStatus(var error: Short, offset: Long, timestamp: Long = Message.NoTimestamp)
 
 case class ProducerResponse(correlationId: Int,
                             status: Map[TopicAndPartition, ProducerResponseStatus],
@@ -72,7 +74,8 @@ case class ProducerResponse(correlationId: Int,
       currTopic._2.size * {
         4 + /* partition id */
         2 + /* error code */
-        8 /* offset */
+        8 + /* offset */
+        8 /* timestamp */
       }
     }) +
     throttleTimeSize
@@ -88,10 +91,11 @@ case class ProducerResponse(correlationId: Int,
       writeShortString(buffer, topic)
       buffer.putInt(errorsAndOffsets.size) // partition count
       errorsAndOffsets.foreach {
-        case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset))) =>
+        case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset, timestamp))) =>
           buffer.putInt(partition)
           buffer.putShort(error)
           buffer.putLong(nextOffset)
+          buffer.putLong(timestamp)
       }
     })
     // Throttle time is only supported on V1 style requests

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index e20b88c..9708c4e 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -62,6 +62,7 @@ object ErrorMapping {
   val TopicAuthorizationCode: Short = 29
   val GroupAuthorizationCode: Short = 30
   val ClusterAuthorizationCode: Short = 31
+  // 32: INVALID_TIMESTAMP
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 7942f57..3774e73 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -21,7 +21,9 @@ import java.util.Properties
 import java.util.regex.Pattern
 
 import kafka.common.StreamEndException
+import kafka.message.Message
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.record.TimestampType
 
 /**
  * A base consumer used to abstract both old and new consumer
@@ -35,7 +37,13 @@ trait BaseConsumer {
   def commit()
 }
 
-case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte])
+case class BaseConsumerRecord(topic: String,
+                              partition: Int,
+                              offset: Long,
+                              timestamp: Long = Message.NoTimestamp,
+                              timestampType: TimestampType = TimestampType.NO_TIMESTAMP_TYPE,
+                              key: Array[Byte],
+                              value: Array[Byte])
 
 class NewShinyConsumer(topic: Option[String], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
   import org.apache.kafka.clients.consumer.KafkaConsumer
@@ -60,7 +68,13 @@ class NewShinyConsumer(topic: Option[String], whitelist: Option[String], consume
     }
 
     val record = recordIter.next
-    BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)
+    BaseConsumerRecord(record.topic,
+                       record.partition,
+                       record.offset,
+                       record.timestamp,
+                       record.timestampType,
+                       record.key,
+                       record.value)
   }
 
   override def stop() {
@@ -89,7 +103,13 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends B
       throw new StreamEndException
 
     val messageAndMetadata = iter.next
-    BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key, messageAndMetadata.message)
+    BaseConsumerRecord(messageAndMetadata.topic,
+                       messageAndMetadata.partition,
+                       messageAndMetadata.offset,
+                       messageAndMetadata.timestamp,
+                       messageAndMetadata.timestampType,
+                       messageAndMetadata.key,
+                       messageAndMetadata.message)
   }
 
   override def stop() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 0c6c810..0735651 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -100,7 +100,14 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
 
     item.message.ensureValid() // validate checksum of message to ensure it is valid
 
-    new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
+    new MessageAndMetadata(currentTopicInfo.topic,
+                           currentTopicInfo.partitionId,
+                           item.message,
+                           item.offset,
+                           item.message.timestamp,
+                           item.message.timestampType,
+                           keyDecoder,
+                           valueDecoder)
   }
 
   def clearCurrentChunk() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 02ba814..513f383 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -16,7 +16,7 @@
 */
 package kafka.controller
 
-import kafka.api.{LeaderAndIsr, KAFKA_090, PartitionStateInfo}
+import kafka.api.{LeaderAndIsr, KAFKA_0_9_0, PartitionStateInfo}
 import kafka.utils._
 import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, Node}
@@ -380,7 +380,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
           topicPartition -> partitionState
         }
 
-        val version = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_090)) (1: Short) else (0: Short)
+        val version = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_0_9_0)) (1: Short) else (0: Short)
 
         val updateMetadataRequest =
           if (version == 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 2f1b842..c86e87b 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -21,9 +21,10 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
 import kafka.log.LogConfig
-import kafka.message.{Message, UncompressedCodec}
+import kafka.message.UncompressedCodec
 import kafka.server._
 import kafka.utils._
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.JoinGroupRequest
 
@@ -48,7 +49,8 @@ case class JoinGroupResult(members: Map[String, Array[Byte]],
 class GroupCoordinator(val brokerId: Int,
                        val groupConfig: GroupConfig,
                        val offsetConfig: OffsetConfig,
-                       val groupManager: GroupMetadataManager) extends Logging {
+                       val groupManager: GroupMetadataManager,
+                       time: Time) extends Logging {
   type JoinCallback = JoinGroupResult => Unit
   type SyncCallback = (Array[Byte], Short) => Unit
 
@@ -59,13 +61,6 @@ class GroupCoordinator(val brokerId: Int,
   private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
   private var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = null
 
-  def this(brokerId: Int,
-           groupConfig: GroupConfig,
-           offsetConfig: OffsetConfig,
-           replicaManager: ReplicaManager,
-           zkUtils: ZkUtils) = this(brokerId, groupConfig, offsetConfig,
-    new GroupMetadataManager(brokerId, offsetConfig, replicaManager, zkUtils))
-
   def offsetsTopicConfigs: Properties = {
     val props = new Properties
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
@@ -563,7 +558,7 @@ class GroupCoordinator(val brokerId: Int,
    */
   private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
     // complete current heartbeat expectation
-    member.latestHeartbeat = SystemTime.milliseconds
+    member.latestHeartbeat = time.milliseconds()
     val memberKey = MemberKey(member.groupId, member.memberId)
     heartbeatPurgatory.checkAndComplete(memberKey)
 
@@ -731,7 +726,8 @@ object GroupCoordinator {
 
   def create(config: KafkaConfig,
              zkUtils: ZkUtils,
-             replicaManager: ReplicaManager): GroupCoordinator = {
+             replicaManager: ReplicaManager,
+             time: Time): GroupCoordinator = {
     val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
       loadBufferSize = config.offsetsLoadBufferSize,
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
@@ -743,22 +739,8 @@ object GroupCoordinator {
     val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
 
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils)
+    val groupManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager, zkUtils, time)
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager, time)
   }
 
-  def create(config: KafkaConfig,
-             groupManager: GroupMetadataManager): GroupCoordinator = {
-    val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
-      loadBufferSize = config.offsetsLoadBufferSize,
-      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
-      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
-      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
-      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
-      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
-      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
-    val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
-      groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
-
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager)
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 48818c3..b3e1bc1 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -27,8 +27,10 @@ import org.apache.kafka.common.protocol.types.Type.STRING
 import org.apache.kafka.common.protocol.types.Type.INT32
 import org.apache.kafka.common.protocol.types.Type.INT64
 import org.apache.kafka.common.protocol.types.Type.BYTES
+import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.Time
 
 import kafka.utils._
 import kafka.common._
@@ -47,14 +49,14 @@ import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Gauge
 
-
 case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
                         callback: Map[TopicPartition, PartitionResponse] => Unit)
 
 class GroupMetadataManager(val brokerId: Int,
                            val config: OffsetConfig,
                            replicaManager: ReplicaManager,
-                           zkUtils: ZkUtils) extends Logging with KafkaMetricsGroup {
+                           zkUtils: ZkUtils,
+                           time: Time) extends Logging with KafkaMetricsGroup {
 
   /* offsets cache */
   private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
@@ -141,7 +143,8 @@ class GroupMetadataManager(val brokerId: Int,
       // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
       // retry removing this group.
       val groupPartition = partitionFor(group.groupId)
-      val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId))
+      val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
+        timestamp = time.milliseconds(), magicValue = getMessageFormatVersion(groupPartition))
 
       val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
       partitionOpt.foreach { partition =>
@@ -166,10 +169,11 @@ class GroupMetadataManager(val brokerId: Int,
   def prepareStoreGroup(group: GroupMetadata,
                         groupAssignment: Map[String, Array[Byte]],
                         responseCallback: Short => Unit): DelayedStore = {
-    // construct the message to append
     val message = new Message(
       key = GroupMetadataManager.groupMetadataKey(group.groupId),
-      bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment)
+      bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment),
+      timestamp = time.milliseconds(),
+      magicValue = getMessageFormatVersion(partitionFor(group.groupId))
     )
 
     val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))
@@ -251,7 +255,9 @@ class GroupMetadataManager(val brokerId: Int,
     val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
       new Message(
         key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
-        bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
+        bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
+        timestamp = time.milliseconds(),
+        magicValue = getMessageFormatVersion(partitionFor(groupId))
       )
     }.toSeq
 
@@ -358,7 +364,7 @@ class GroupMetadataManager(val brokerId: Int,
         }
       }
 
-      val startMs = SystemTime.milliseconds
+      val startMs = time.milliseconds()
       try {
         replicaManager.logManager.getLog(topicPartition) match {
           case Some(log) =>
@@ -437,7 +443,7 @@ class GroupMetadataManager(val brokerId: Int,
 
             if (!shuttingDown.get())
               info("Finished loading offsets from %s in %d milliseconds."
-                .format(topicPartition, SystemTime.milliseconds - startMs))
+                .format(topicPartition, time.milliseconds() - startMs))
           case None =>
             warn("No log found for " + topicPartition)
         }
@@ -532,7 +538,7 @@ class GroupMetadataManager(val brokerId: Int,
 
   private def deleteExpiredOffsets() {
     debug("Collecting expired offsets.")
-    val startMs = SystemTime.milliseconds
+    val startMs = time.milliseconds()
 
     val numExpiredOffsetsRemoved = inWriteLock(offsetExpireLock) {
       val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) =>
@@ -551,7 +557,8 @@ class GroupMetadataManager(val brokerId: Int,
         val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
           groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
 
-        (offsetsPartition, new Message(bytes = null, key = commitKey))
+        (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = time.milliseconds(),
+          magicValue = getMessageFormatVersion(offsetsPartition)))
       }.groupBy { case (partition, tombstone) => partition }
 
       // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
@@ -580,7 +587,7 @@ class GroupMetadataManager(val brokerId: Int,
       }.sum
     }
 
-    info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, SystemTime.milliseconds - startMs))
+    info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, time.milliseconds() - startMs))
   }
 
   private def getHighWatermark(partitionId: Int): Long = {
@@ -620,6 +627,13 @@ class GroupMetadataManager(val brokerId: Int,
       config.offsetsTopicNumPartitions
   }
 
+  private def getMessageFormatVersion(partition: Int): Byte = {
+    val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition)
+    replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
+      throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
+    }
+  }
+
   /**
    * Add the partition into the owned list
    *
@@ -954,9 +968,9 @@ object GroupMetadataManager {
   // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
   // (specify --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
   class OffsetsMessageFormatter extends MessageFormatter {
-    def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+    def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
       val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
-
+      // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
       // only print if the message is an offset record
       if (formattedKey.isInstanceOf[OffsetKey]) {
         val groupTopicPartition = formattedKey.asInstanceOf[OffsetKey].toString
@@ -971,9 +985,9 @@ object GroupMetadataManager {
 
   // Formatter for use with tools to read group metadata history
   class GroupMetadataMessageFormatter extends MessageFormatter {
-    def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+    def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
       val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
-
+      // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
       // only print if the message is a group metadata record
       if (formattedKey.isInstanceOf[GroupMetadataKey]) {
         val groupId = formattedKey.asInstanceOf[GroupMetadataKey].key


Mime
View raw message