kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5378; Return LSO in FetchResponse plus some metrics
Date Wed, 07 Jun 2017 15:46:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 60d2f1294 -> 172041d57


KAFKA-5378; Return LSO in FetchResponse plus some metrics

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3248 from hachikuji/KAFKA-5378

(cherry picked from commit dcbdce31ba525771016be5be4abc4a2067e0890b)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/172041d5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/172041d5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/172041d5

Branch: refs/heads/0.11.0
Commit: 172041d572a53ae7984216837bf956f7f9cfd0fa
Parents: 60d2f12
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Jun 7 16:14:09 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Jun 7 16:46:18 2017 +0100

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |   7 +-
 .../consumer/internals/SubscriptionState.java   |  14 +-
 .../kafka/clients/producer/KafkaProducer.java   |   2 +-
 .../producer/internals/RecordAccumulator.java   |   2 +-
 .../producer/internals/TransactionManager.java  |   4 +-
 .../clients/consumer/internals/FetcherTest.java | 118 ++++++---
 .../internals/TransactionManagerTest.java       |  86 +++----
 .../main/scala/kafka/cluster/Partition.scala    |  11 +
 core/src/main/scala/kafka/log/Log.scala         |  11 +-
 .../main/scala/kafka/server/DelayedFetch.scala  |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   3 +-
 .../scala/kafka/server/ReplicaManager.scala     |  42 ++--
 .../unit/kafka/server/ISRExpirationTest.scala   |  25 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 249 ++++++++++---------
 .../unit/kafka/server/SimpleFetchTest.scala     |   5 +-
 15 files changed, 332 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e73ff4e..dff223f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -571,7 +571,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                         "position to {}", position, partitionRecords.partition, nextOffset);
                 subscriptions.position(partitionRecords.partition, nextOffset);
 
-                Long partitionLag = subscriptions.partitionLag(partitionRecords.partition);
+                Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
                 if (partitionLag != null)
                     this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
 
@@ -855,6 +855,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                     log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
                     subscriptions.updateHighWatermark(tp, partition.highWatermark);
                 }
+
+                if (partition.lastStableOffset >= 0) {
+                    log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
+                    subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
+                }
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
                 log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
                 this.metadata.requestUpdate();

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 421a3cf..e852bd2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.PartitionStates;
+import org.apache.kafka.common.requests.IsolationLevel;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -318,15 +319,22 @@ public class SubscriptionState {
         return assignedState(tp).position;
     }
 
-    public Long partitionLag(TopicPartition tp) {
+    public Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) {
         TopicPartitionState topicPartitionState = assignedState(tp);
-        return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position;
+        if (isolationLevel == IsolationLevel.READ_COMMITTED)
+            return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position;
+        else
+            return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position;
     }
 
     public void updateHighWatermark(TopicPartition tp, long highWatermark) {
         assignedState(tp).highWatermark = highWatermark;
     }
 
+    public void updateLastStableOffset(TopicPartition tp, long lastStableOffset) {
+        assignedState(tp).lastStableOffset = lastStableOffset;
+    }
+
     public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
         Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
         for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
@@ -427,6 +435,7 @@ public class SubscriptionState {
     private static class TopicPartitionState {
         private Long position; // last consumed position
         private Long highWatermark; // the high watermark from last fetch
+        private Long lastStableOffset;
         private OffsetAndMetadata committed;  // last committed position
         private boolean paused;  // whether this partition has been paused by the user
         private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
@@ -435,6 +444,7 @@ public class SubscriptionState {
             this.paused = false;
             this.position = null;
             this.highWatermark = null;
+            this.lastStableOffset = null;
             this.committed = null;
             this.resetStrategy = null;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 0f109d8..a7336d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -608,7 +608,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
         if (transactionManager != null)
-            transactionManager.failIfUnreadyForSend();
+            transactionManager.failIfNotReadyForSend();
 
         TopicPartition tp = null;
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 0315b13..5d4af74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -469,7 +469,7 @@ public final class RecordAccumulator {
                                         ProducerIdAndEpoch producerIdAndEpoch = null;
                                         boolean isTransactional = false;
                                         if (transactionManager != null) {
-                                            if (!transactionManager.sendToPartitionAllowed(tp))
+                                            if (!transactionManager.isSendToPartitionAllowed(tp))
                                                 break;
 
                                             producerIdAndEpoch = transactionManager.producerIdAndEpoch();

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index dcd7a1f..d959d7d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -235,7 +235,7 @@ public class TransactionManager {
         return lastError;
     }
 
-    public synchronized void failIfUnreadyForSend() {
+    public synchronized void failIfNotReadyForSend() {
         if (hasError())
             throw new KafkaException("Cannot perform send because at least one previous transactional or " +
                     "idempotent request has failed with errors.", lastError);
@@ -250,7 +250,7 @@ public class TransactionManager {
         }
     }
 
-    synchronized boolean sendToPartitionAllowed(TopicPartition tp) {
+    synchronized boolean isSendToPartitionAllowed(TopicPartition tp) {
         if (hasFatalError())
             return false;
         return !isTransactional() || partitionsInTransaction.contains(tp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0ed1139..cad17bc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -170,7 +170,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
-        client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -212,7 +212,7 @@ public class FetcherTest {
 
         buffer.flip();
 
-        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -235,7 +235,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
-        client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -276,7 +276,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp1));
         subscriptions.seek(tp1, 1);
 
-        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
@@ -329,7 +329,7 @@ public class FetcherTest {
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
         // the first fetchedRecords() should return the first valid message
@@ -351,7 +351,7 @@ public class FetcherTest {
         // Should not throw exception after the seek.
         fetcher.fetchedRecords();
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
         List<ConsumerRecord<byte[], byte[]>> records = fetcher.fetchedRecords().get(tp1);
@@ -384,7 +384,7 @@ public class FetcherTest {
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
         // the fetchedRecords() should always throw exception due to the bad batch.
@@ -415,7 +415,7 @@ public class FetcherTest {
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         try {
             fetcher.fetchedRecords();
@@ -448,7 +448,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp1));
         subscriptions.seek(tp1, 1);
 
-        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(memoryRecords, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, memoryRecords, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
@@ -478,8 +478,8 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp1));
         subscriptions.seek(tp1, 1);
 
-        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
-        client.prepareResponse(matchesOffset(tp1, 4), fetchResponse(this.nextRecords, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp1, 4), fetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
@@ -562,7 +562,7 @@ public class FetcherTest {
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(records, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         consumerRecords = fetcher.fetchedRecords().get(tp1);
         assertEquals(3, consumerRecords.size());
@@ -622,7 +622,7 @@ public class FetcherTest {
         assertFalse(fetcher.hasCompletedFetches());
         MemoryRecords partialRecord = MemoryRecords.readableRecords(
             ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}));
-        client.prepareResponse(fetchResponse(partialRecord, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, partialRecord, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
     }
@@ -634,7 +634,7 @@ public class FetcherTest {
 
         // resize the limit of the buffer to pretend it is only fetch-size large
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
         consumerClient.poll(0);
         try {
             fetcher.fetchedRecords();
@@ -654,7 +654,7 @@ public class FetcherTest {
 
         // Now the rebalance happens and fetch positions are cleared
         subscriptions.assignFromSubscribed(singleton(tp1));
-        client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
         // The active fetch should be ignored since its position is no longer valid
@@ -669,7 +669,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         subscriptions.pause(tp1);
 
-        client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertNull(fetcher.fetchedRecords().get(tp1));
     }
@@ -690,7 +690,7 @@ public class FetcherTest {
         subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -702,7 +702,7 @@ public class FetcherTest {
         subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -714,7 +714,7 @@ public class FetcherTest {
         subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertTrue(subscriptions.isOffsetResetNeeded(tp1));
@@ -729,7 +729,7 @@ public class FetcherTest {
         subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         subscriptions.seek(tp1, 1);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -743,7 +743,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.seek(tp1, 0);
 
         assertTrue(fetcherNoAutoReset.sendFetches() > 0);
-        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
         subscriptionsNoAutoReset.seek(tp1, 2);
@@ -756,7 +756,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.seek(tp1, 0);
 
         fetcherNoAutoReset.sendFetches();
-        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
 
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
@@ -824,7 +824,7 @@ public class FetcherTest {
         Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
         partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
             FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
-        client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
         assertEquals(2, fetcher.fetchedRecords().get(tp1).size());
@@ -850,7 +850,7 @@ public class FetcherTest {
         subscriptions.seek(tp1, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0), true);
+        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0), true);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
 
@@ -1116,7 +1116,7 @@ public class FetcherTest {
             ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null);
             client.send(request, time.milliseconds());
             client.poll(1, time.milliseconds());
-            FetchResponse response = fetchResponse(nextRecords, Errors.NONE, i, throttleTimeMs);
+            FetchResponse response = fetchResponse(tp1, nextRecords, Errors.NONE, i, throttleTimeMs);
             buffer = response.serialize(ApiKeys.FETCH.latestVersion(), new ResponseHeader(request.correlationId()));
             selector.completeReceive(new NetworkReceive(node.idString(), buffer));
             client.poll(1, time.milliseconds());
@@ -1131,7 +1131,6 @@ public class FetcherTest {
         client.close();
     }
 
-
     /*
      * Send multiple requests. Verify that the client side quota metrics have the right values
      */
@@ -1150,7 +1149,7 @@ public class FetcherTest {
         assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);
 
         // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse
-        fetchRecords(MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
+        fetchRecords(tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
         assertEquals(100, recordsFetchLagMax.value(), EPSILON);
 
         KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
@@ -1161,8 +1160,48 @@ public class FetcherTest {
                 TimestampType.CREATE_TIME, 0L);
         for (int v = 0; v < 3; v++)
             builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
-        fetchRecords(builder.build(), Errors.NONE, 200L, 0);
+        fetchRecords(tp1, builder.build(), Errors.NONE, 200L, 0);
         assertEquals(197, recordsFetchLagMax.value(), EPSILON);
+        assertEquals(197, partitionLag.value(), EPSILON);
+
+        // verify de-registration of partition lag
+        subscriptions.unsubscribe();
+        assertFalse(allMetrics.containsKey(partitionLagMetric));
+    }
+
+    @Test
+    public void testReadCommittedLagMetric() {
+        Metrics metrics = new Metrics();
+        fetcher = createFetcher(subscriptions, metrics, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
+
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 0);
+
+        MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
+        MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
+
+        // recordsFetchLagMax should be initialized to negative infinity
+        assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);
+
+        // recordsFetchLagMax should be lso - fetchOffset after receiving an empty FetchResponse
+        fetchRecords(tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
+        assertEquals(50, recordsFetchLagMax.value(), EPSILON);
+
+        KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
+        assertEquals(50, partitionLag.value(), EPSILON);
+
+        // recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+        fetchRecords(tp1, builder.build(), Errors.NONE, 200L, 150L, 0);
+        assertEquals(147, recordsFetchLagMax.value(), EPSILON);
+        assertEquals(147, partitionLag.value(), EPSILON);
 
         // verify de-registration of partition lag
         subscriptions.unsubscribe();
@@ -1188,7 +1227,7 @@ public class FetcherTest {
         for (Record record : records.records())
             expectedBytes += record.sizeInBytes();
 
-        fetchRecords(records, Errors.NONE, 100L, 0);
+        fetchRecords(tp1, records, Errors.NONE, 100L, 0);
         assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
         assertEquals(3, recordsCountAverage.value(), EPSILON);
     }
@@ -1214,7 +1253,7 @@ public class FetcherTest {
                 expectedBytes += record.sizeInBytes();
         }
 
-        fetchRecords(records, Errors.NONE, 100L, 0);
+        fetchRecords(tp1, records, Errors.NONE, 100L, 0);
         assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
         assertEquals(2, recordsCountAverage.value(), EPSILON);
     }
@@ -1294,9 +1333,15 @@ public class FetcherTest {
         assertEquals(3, recordsCountAverage.value(), EPSILON);
     }
 
-    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords records, Errors error, long hw, int throttleTime) {
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(
+            TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
+        return fetchRecords(tp, records, error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, throttleTime);
+    }
+
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(
+            TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime) {
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(records, error, hw, throttleTime));
+        client.prepareResponse(fetchResponse(tp, records, error, hw, lastStableOffset, throttleTime));
         consumerClient.poll(0);
         return fetcher.fetchedRecords();
     }
@@ -1821,13 +1866,14 @@ public class FetcherTest {
         return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
     }
 
-    private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) {
-        return fetchResponse(tp1, records, error, hw, throttleTime);
+    private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
+        return fetchResponse(tp, records, error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, throttleTime);
     }
 
-    private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
+    private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
+                                        long lastStableOffset, int throttleTime) {
         Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
-                new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
+                new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, null, records));
         return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 4f2f10f..55c9241 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -123,48 +123,48 @@ public class TransactionManagerTest {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testFailIfUnreadyForSendNoProducerId() {
-        transactionManager.failIfUnreadyForSend();
+    public void testFailIfNotReadyForSendNoProducerId() {
+        transactionManager.failIfNotReadyForSend();
     }
 
     @Test
-    public void testFailIfUnreadyForSendIdempotentProducer() {
+    public void testFailIfNotReadyForSendIdempotentProducer() {
         TransactionManager idempotentTransactionManager = new TransactionManager();
-        idempotentTransactionManager.failIfUnreadyForSend();
+        idempotentTransactionManager.failIfNotReadyForSend();
     }
 
     @Test(expected = KafkaException.class)
-    public void testFailIfUnreadyForSendIdempotentProducerFatalError() {
+    public void testFailIfNotReadyForSendIdempotentProducerFatalError() {
         TransactionManager idempotentTransactionManager = new TransactionManager();
         idempotentTransactionManager.transitionToFatalError(new KafkaException());
-        idempotentTransactionManager.failIfUnreadyForSend();
+        idempotentTransactionManager.failIfNotReadyForSend();
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testFailIfUnreadyForSendNoOngoingTransaction() {
+    public void testFailIfNotReadyForSendNoOngoingTransaction() {
         long pid = 13131L;
         short epoch = 1;
         doInitTransactions(pid, epoch);
-        transactionManager.failIfUnreadyForSend();
+        transactionManager.failIfNotReadyForSend();
     }
 
     @Test(expected = KafkaException.class)
-    public void testFailIfUnreadyForSendAfterAbortableError() {
+    public void testFailIfNotReadyForSendAfterAbortableError() {
         long pid = 13131L;
         short epoch = 1;
         doInitTransactions(pid, epoch);
         transactionManager.beginTransaction();
         transactionManager.transitionToAbortableError(new KafkaException());
-        transactionManager.failIfUnreadyForSend();
+        transactionManager.failIfNotReadyForSend();
     }
 
     @Test(expected = KafkaException.class)
-    public void testFailIfUnreadyForSendAfterFatalError() {
+    public void testFailIfNotReadyForSendAfterFatalError() {
         long pid = 13131L;
         short epoch = 1;
         doInitTransactions(pid, epoch);
         transactionManager.transitionToFatalError(new KafkaException());
-        transactionManager.failIfUnreadyForSend();
+        transactionManager.failIfNotReadyForSend();
     }
 
     @Test
@@ -334,7 +334,7 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void testSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
+    public void testIsSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -344,12 +344,12 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
         transactionManager.transitionToAbortableError(new KafkaException());
 
-        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
         assertTrue(transactionManager.hasAbortableError());
     }
 
     @Test
-    public void testSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
+    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -362,12 +362,12 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());
         transactionManager.transitionToAbortableError(new KafkaException());
 
-        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
         assertTrue(transactionManager.hasAbortableError());
     }
 
     @Test
-    public void testSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
+    public void testIsSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -377,12 +377,12 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
         transactionManager.transitionToFatalError(new KafkaException());
 
-        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
         assertTrue(transactionManager.hasFatalError());
     }
 
     @Test
-    public void testSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
+    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -395,12 +395,12 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());
         transactionManager.transitionToFatalError(new KafkaException());
 
-        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
         assertTrue(transactionManager.hasFatalError());
     }
 
     @Test
-    public void testSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
+    public void testIsSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -414,12 +414,12 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasPartitionsToAdd());
         transactionManager.transitionToAbortableError(new KafkaException());
 
-        assertTrue(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
         assertTrue(transactionManager.hasAbortableError());
     }
 
     @Test
-    public void testSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
+    public void testIsSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -432,17 +432,17 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasPartitionsToAdd());
         transactionManager.transitionToFatalError(new KafkaException());
 
-        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
         assertTrue(transactionManager.hasFatalError());
     }
 
     @Test
-    public void testSendToPartitionAllowedWithUnaddedPartition() {
+    public void testIsSendToPartitionAllowedWithPartitionNotAdded() {
         final long pid = 13131L;
         final short epoch = 1;
         doInitTransactions(pid, epoch);
         transactionManager.beginTransaction();
-        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -487,11 +487,11 @@ public class TransactionManagerTest {
 
         prepareProduceResponse(Errors.NONE, pid, epoch);
         assertFalse(transactionManager.transactionContainsPartition(tp0));
-        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
         sender.run(time.milliseconds());  // send addPartitions.
         // Check that only addPartitions was sent.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
-        assertTrue(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
         assertFalse(responseFuture.isDone());
 
         sender.run(time.milliseconds());  // send produce request.
@@ -1300,8 +1300,8 @@ public class TransactionManagerTest {
         accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
 
-        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
-        assertFalse(transactionManager.sendToPartitionAllowed(tp1));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp1));
 
         Node node1 = new Node(0, "localhost", 1111);
         Node node2 = new Node(1, "localhost", 1112);
@@ -1341,7 +1341,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // Send AddPartitions, should be in abortable state.
 
         assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.sendToPartitionAllowed(tp1));
+        assertTrue(transactionManager.isSendToPartitionAllowed(tp1));
 
         // Try to drain a message destined for tp1, it should get drained.
         Node node1 = new Node(1, "localhost", 1112);
@@ -1361,30 +1361,6 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.hasAbortableError());
     }
 
-    @Test
-    public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException {
-        final long pid = 13131L;
-        final short epoch = 1;
-        doInitTransactions(pid, epoch);
-        transactionManager.beginTransaction();
-        // Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain.
-        accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
-        Node node1 = new Node(0, "localhost", 1111);
-        PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);
-
-        Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1),
-                Collections.<String>emptySet(), Collections.<String>emptySet());
-        Set<Node> nodes = new HashSet<>();
-        nodes.add(node1);
-        Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, nodes, Integer.MAX_VALUE,
-                time.milliseconds());
-
-        // We shouldn't drain batches which haven't been added to the transaction yet.
-        assertTrue(drainedBatches.containsKey(node1.id()));
-        assertTrue(drainedBatches.get(node1.id()).isEmpty());
-    }
-
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {
         final long pid = 1L;
         final short epoch = 1;

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e617404..aa11ba1 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -98,6 +98,17 @@ class Partition(val topic: String,
     tags
   )
 
+  newGauge("LastStableOffsetLag",
+    new Gauge[Long] {
+      def value = {
+        leaderReplicaIfLocal.map { replica =>
+          replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset
+        }.getOrElse(0)
+      }
+    },
+    tags
+  )
+
   private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined
 
   def isUnderReplicated: Boolean =

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 4bd2c2c..7a3bc94 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -868,13 +868,18 @@ class Log(@volatile var dir: File,
     // We create the local variables to avoid race conditions with updates to the log.
     val currentNextOffsetMetadata = nextOffsetMetadata
     val next = currentNextOffsetMetadata.messageOffset
-    if(startOffset == next)
-      return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY)
+    if (startOffset == next) {
+      val abortedTransactions =
+        if (isolationLevel == IsolationLevel.READ_COMMITTED) Some(List.empty[AbortedTransaction])
+        else None
+      return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false,
+        abortedTransactions = abortedTransactions)
+    }
 
     var segmentEntry = segments.floorEntry(startOffset)
 
     // return error on attempt to read beyond the log end offset or read below log start offset
-    if(startOffset > next || segmentEntry == null || startOffset < logStartOffset)
+    if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)
       throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
 
     // Do the read on the segment with a base offset less than the target offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 13a01bc..8a9ce02 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -156,8 +156,8 @@ class DelayedFetch(delayMs: Long,
       isolationLevel = isolationLevel)
 
     val fetchPartitionData = logReadResults.map { case (tp, result) =>
-      tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records,
-        result.info.abortedTransactions)
+      tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
+        result.lastStableOffset, result.info.abortedTransactions)
     }
 
     responseCallback(fetchPartitionData)

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1d289b3..6cff0e6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -533,7 +533,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       val partitionData = {
         responsePartitionData.map { case (tp, data) =>
           val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
-          tp -> new FetchResponse.PartitionData(data.error, data.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+          val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+          tp -> new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
             data.logStartOffset, abortedTransactions, data.records)
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d2670b7..853b7c4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -74,12 +74,13 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
  * @param error Exception if error encountered while reading from the log
  */
 case class LogReadResult(info: FetchDataInfo,
-                         hw: Long,
+                         highWatermark: Long,
                          leaderLogStartOffset: Long,
                          leaderLogEndOffset: Long,
                          followerLogStartOffset: Long,
                          fetchTimeMs: Long,
                          readSize: Int,
+                         lastStableOffset: Option[Long],
                          exception: Option[Throwable] = None) {
 
   def error: Errors = exception match {
@@ -88,22 +89,27 @@ case class LogReadResult(info: FetchDataInfo,
   }
 
   override def toString =
-    s"Fetch Data: [$info], HW: [$hw], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
+    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
     s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], error: [$error]"
 
 }
 
-case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, logStartOffset: Long, records: Records,
+case class FetchPartitionData(error: Errors = Errors.NONE,
+                              highWatermark: Long,
+                              logStartOffset: Long,
+                              records: Records,
+                              lastStableOffset: Option[Long],
                               abortedTransactions: Option[List[AbortedTransaction]])
 
 object LogReadResult {
   val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
-                                           hw = -1L,
+                                           highWatermark = -1L,
                                            leaderLogStartOffset = -1L,
                                            leaderLogEndOffset = -1L,
                                            followerLogStartOffset = -1L,
                                            fetchTimeMs = -1L,
-                                           readSize = -1)
+                                           readSize = -1,
+                                           lastStableOffset = None)
 }
 
 case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Errors], error: Errors) {
@@ -627,8 +633,8 @@ class ReplicaManager(val config: KafkaConfig,
     //                        4) some error happens while reading data
     if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
-        tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records,
-          result.info.abortedTransactions)
+        tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
+          result.lastStableOffset, result.info.abortedTransactions)
       }
       responseCallback(fetchPartitionData)
     } else {
@@ -684,11 +690,15 @@ class ReplicaManager(val config: KafkaConfig,
         else
           getReplicaOrException(tp)
 
-        // decide whether to only fetch committed data (i.e. messages below high watermark)
-        val maxOffsetOpt = if (isolationLevel == IsolationLevel.READ_COMMITTED)
+        val initialHighWatermark = localReplica.highWatermark.messageOffset
+        val lastStableOffset = if (isolationLevel == IsolationLevel.READ_COMMITTED)
           Some(localReplica.lastStableOffset.messageOffset)
-        else if (readOnlyCommitted)
-          Some(localReplica.highWatermark.messageOffset)
+        else
+          None
+
+        // decide whether to only fetch committed data (i.e. messages below high watermark)
+        val maxOffsetOpt = if (readOnlyCommitted)
+          Some(lastStableOffset.getOrElse(initialHighWatermark))
         else
           None
 
@@ -699,7 +709,6 @@ class ReplicaManager(val config: KafkaConfig,
          * This can cause a replica to always be out of sync.
          */
         val initialLogEndOffset = localReplica.logEndOffset.messageOffset
-        val initialHighWatermark = localReplica.highWatermark.messageOffset
         val initialLogStartOffset = localReplica.logStartOffset
         val fetchTimeMs = time.milliseconds
         val logReadInfo = localReplica.log match {
@@ -724,12 +733,13 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
         LogReadResult(info = logReadInfo,
-                      hw = initialHighWatermark,
+                      highWatermark = initialHighWatermark,
                       leaderLogStartOffset = initialLogStartOffset,
                       leaderLogEndOffset = initialLogEndOffset,
                       followerLogStartOffset = followerLogStartOffset,
                       fetchTimeMs = fetchTimeMs,
                       readSize = partitionFetchSize,
+                      lastStableOffset = lastStableOffset,
                       exception = None)
       } catch {
         // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
@@ -739,24 +749,26 @@ class ReplicaManager(val config: KafkaConfig,
                  _: ReplicaNotAvailableException |
                  _: OffsetOutOfRangeException) =>
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
-                        hw = -1L,
+                        highWatermark = -1L,
                         leaderLogStartOffset = -1L,
                         leaderLogEndOffset = -1L,
                         followerLogStartOffset = -1L,
                         fetchTimeMs = -1L,
                         readSize = partitionFetchSize,
+                        lastStableOffset = None,
                         exception = Some(e))
         case e: Throwable =>
           brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
           brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
           error(s"Error processing fetch operation on partition $tp, offset $offset", e)
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
-                        hw = -1L,
+                        highWatermark = -1L,
                         leaderLogStartOffset = -1L,
                         leaderLogEndOffset = -1L,
                         followerLogStartOffset = -1L,
                         fetchTimeMs = -1L,
                         readSize = partitionFetchSize,
+                        lastStableOffset = None,
                         exception = Some(e))
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index da94569..5d221fe 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -80,12 +80,13 @@ class IsrExpirationTest {
     // let the follower catch up to the Leader logEndOffset (15)
     for (replica <- partition0.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY),
-                                                    hw = 15L,
+                                                    highWatermark = 15L,
                                                     leaderLogStartOffset = 0L,
                                                     leaderLogEndOffset = 15L,
                                                     followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
-                                                    readSize = -1))
+                                                    readSize = -1,
+                                                    lastStableOffset = None))
     var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
 
@@ -134,12 +135,13 @@ class IsrExpirationTest {
     // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms
     for (replica <- partition0.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY),
-                                                    hw = 10L,
+                                                    highWatermark = 10L,
                                                     leaderLogStartOffset = 0L,
                                                     leaderLogEndOffset = 15L,
                                                     followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
-                                                    readSize = -1))
+                                                    readSize = -1,
+                                                    lastStableOffset = None))
 
     // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log.
     // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck
@@ -150,12 +152,13 @@ class IsrExpirationTest {
 
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
       r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY),
-                            hw = 11L,
+                            highWatermark = 11L,
                             leaderLogStartOffset = 0L,
                             leaderLogEndOffset = 15L,
                             followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
-                            readSize = -1))
+                            readSize = -1,
+                            lastStableOffset = None))
     }
     partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
@@ -169,12 +172,13 @@ class IsrExpirationTest {
     // Now actually make a fetch to the end of the log. The replicas should be back in ISR
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
       r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY),
-                            hw = 15L,
+                            highWatermark = 15L,
                             leaderLogStartOffset = 0L,
                             leaderLogEndOffset = 15L,
                             followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
-                            readSize = -1))
+                            readSize = -1,
+                            lastStableOffset = None))
     }
     partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
@@ -195,12 +199,13 @@ class IsrExpirationTest {
     // set lastCaughtUpTime to current time
     for (replica <- partition.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY),
-                                                    hw = 0L,
+                                                    highWatermark = 0L,
                                                     leaderLogStartOffset = 0L,
                                                     leaderLogEndOffset = 0L,
                                                     followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
-                                                    readSize = -1))
+                                                    readSize = -1,
+                                                    lastStableOffset = None))
     // set the leader and its hw and the hw update time
     partition.leaderReplicaIdOpt = Some(leaderId)
     partition

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 45f62d7..e078e12 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -32,9 +32,10 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, PartitionState}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
-import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
@@ -139,20 +140,6 @@ class ReplicaManagerTest {
       metadataCache)
 
     try {
-      var produceCallbackFired = false
-      def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
-        assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION,
-          responseStatus.values.head.error)
-        produceCallbackFired = true
-      }
-
-      var fetchCallbackFired = false
-      def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
-        assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION,
-          responseStatus.map(_._2).head.error)
-        fetchCallbackFired = true
-      }
-
       val brokerList = Seq[Integer](0, 1).asJava
       val brokerSet = Set[Integer](0, 1).asJava
 
@@ -166,11 +153,14 @@ class ReplicaManagerTest {
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
       val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()))
-      appendRecords(rm, Map(new TopicPartition(topic, 0) -> records), produceCallback)
+      val appendResult = appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response =>
+        assertEquals(Errors.NOT_LEADER_FOR_PARTITION, response.error)
+      }
 
       // Fetch some messages
-      fetchAsConsumer(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), fetchCallback,
+      val fetchResult = fetchAsConsumer(rm, new TopicPartition(topic, 0), new PartitionData(0, 0, 100000),
         minBytes = 100000)
+      assertFalse(fetchResult.isFired)
 
       // Make this replica the follower
       val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
@@ -178,8 +168,8 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
 
-      assertTrue(produceCallbackFired)
-      assertTrue(fetchCallbackFired)
+      assertTrue(appendResult.isFired)
+      assertTrue(fetchResult.isFired)
     } finally {
       rm.shutdown(checkpointHW = false)
     }
@@ -204,10 +194,6 @@ class ReplicaManagerTest {
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
-      def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) =
-        responseStatus.values.foreach { status =>
-          assertEquals(Errors.NONE, status.error)
-        }
 
       val producerId = 234L
       val epoch = 5.toShort
@@ -217,67 +203,64 @@ class ReplicaManagerTest {
       for (sequence <- 0 until numRecords) {
         val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence,
           new SimpleRecord(s"message $sequence".getBytes))
-        appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> records), produceCallback)
-      }
-
-      var fetchCallbackFired = false
-      var fetchError = Errors.NONE
-      var fetchedRecords: Records = null
-      def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
-        fetchError = responseStatus.map(_._2).head.error
-        fetchedRecords = responseStatus.map(_._2).head.records
-        fetchCallbackFired = true
+        appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response =>
+          assertEquals(Errors.NONE, response.error)
+        }
       }
 
       // fetch as follower to advance the high watermark
-      fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords, 0, 100000)),
-        fetchCallback, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+      fetchAsFollower(replicaManager, new TopicPartition(topic, 0), new PartitionData(numRecords, 0, 100000),
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
 
       // fetch should return empty since LSO should be stuck at 0
-      fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
-        fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED)
-      assertTrue(fetchCallbackFired)
-      assertEquals(Errors.NONE, fetchError)
-      assertTrue(fetchedRecords.batches.asScala.isEmpty)
-      fetchCallbackFired = false
+      var consumerFetchResult = fetchAsConsumer(replicaManager, new TopicPartition(topic, 0),
+        new PartitionData(0, 0, 100000), isolationLevel = IsolationLevel.READ_COMMITTED)
+      var fetchData = consumerFetchResult.assertFired
+      assertEquals(Errors.NONE, fetchData.error)
+      assertTrue(fetchData.records.batches.asScala.isEmpty)
+      assertEquals(Some(0), fetchData.lastStableOffset)
+      assertEquals(Some(List.empty[AbortedTransaction]), fetchData.abortedTransactions)
 
       // delayed fetch should timeout and return nothing
-      fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
-        fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 1000)
+      consumerFetchResult = fetchAsConsumer(replicaManager, new TopicPartition(topic, 0), new PartitionData(0, 0, 100000),
+        isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 1000)
+      assertFalse(consumerFetchResult.isFired)
       timer.advanceClock(1001)
 
-      assertTrue(fetchCallbackFired)
-      assertEquals(Errors.NONE, fetchError)
-      assertTrue(fetchedRecords.batches.asScala.isEmpty)
-      fetchCallbackFired = false
+      fetchData = consumerFetchResult.assertFired
+      assertEquals(Errors.NONE, fetchData.error)
+      assertTrue(fetchData.records.batches.asScala.isEmpty)
+      assertEquals(Some(0), fetchData.lastStableOffset)
+      assertEquals(Some(List.empty[AbortedTransaction]), fetchData.abortedTransactions)
 
       // now commit the transaction
       val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0)
       val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
-      appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> commitRecordBatch), produceCallback,
-        isFromClient = false)
+      appendRecords(replicaManager, new TopicPartition(topic, 0), commitRecordBatch, isFromClient = false)
+        .onFire { response => assertEquals(Errors.NONE, response.error) }
 
       // the LSO has advanced, but the appended commit marker has not been replicated, so
       // none of the data from the transaction should be visible yet
-      fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
-        fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED)
+      consumerFetchResult = fetchAsConsumer(replicaManager, new TopicPartition(topic, 0), new PartitionData(0, 0, 100000),
+        isolationLevel = IsolationLevel.READ_COMMITTED)
 
-      assertTrue(fetchCallbackFired)
-      assertEquals(Errors.NONE, fetchError)
-      assertTrue(fetchedRecords.batches.asScala.isEmpty)
-      fetchCallbackFired = false
+      fetchData = consumerFetchResult.assertFired
+      assertEquals(Errors.NONE, fetchData.error)
+      assertTrue(fetchData.records.batches.asScala.isEmpty)
 
       // fetch as follower to advance the high watermark
-      fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)),
-        fetchCallback, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+      fetchAsFollower(replicaManager, new TopicPartition(topic, 0), new PartitionData(numRecords + 1, 0, 100000),
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
 
       // now all of the records should be fetchable
-      fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
-        fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED)
+      consumerFetchResult = fetchAsConsumer(replicaManager, new TopicPartition(topic, 0), new PartitionData(0, 0, 100000),
+        isolationLevel = IsolationLevel.READ_COMMITTED)
 
-      assertTrue(fetchCallbackFired)
-      assertEquals(Errors.NONE, fetchError)
-      assertEquals(numRecords + 1, fetchedRecords.batches.asScala.size)
+      fetchData = consumerFetchResult.assertFired
+      assertEquals(Errors.NONE, fetchData.error)
+      assertEquals(Some(numRecords + 1), fetchData.lastStableOffset)
+      assertEquals(Some(List.empty[AbortedTransaction]), fetchData.abortedTransactions)
+      assertEquals(numRecords + 1, fetchData.records.batches.asScala.size)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
@@ -302,11 +285,6 @@ class ReplicaManagerTest {
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
-      def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) =
-        responseStatus.values.foreach { status =>
-          assertEquals(Errors.NONE, status.error)
-        }
-
       val producerId = 234L
       val epoch = 5.toShort
 
@@ -315,35 +293,32 @@ class ReplicaManagerTest {
       for (sequence <- 0 until numRecords) {
         val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence,
           new SimpleRecord(s"message $sequence".getBytes))
-        appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> records), produceCallback)
+        appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response =>
+          assertEquals(Errors.NONE, response.error)
+        }
       }
 
       // now abort the transaction
       val endTxnMarker = new EndTransactionMarker(ControlRecordType.ABORT, 0)
       val abortRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
-      appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> abortRecordBatch), produceCallback,
-        isFromClient = false)
+      appendRecords(replicaManager, new TopicPartition(topic, 0), abortRecordBatch, isFromClient = false)
+        .onFire { response => assertEquals(Errors.NONE, response.error) }
 
       // fetch as follower to advance the high watermark
-      fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)),
-        responseState => (), isolationLevel = IsolationLevel.READ_UNCOMMITTED)
-
-      var fetchDataOpt: Option[FetchPartitionData] = None
-      def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
-        fetchDataOpt = Some(responseStatus.map(_._2).head)
-      }
+      fetchAsFollower(replicaManager, new TopicPartition(topic, 0), new PartitionData(numRecords + 1, 0, 100000),
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
 
       // Set the minBytes in order force this request to enter purgatory. When it returns, we should still
       // see the newly aborted transaction.
-      fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
-        fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 10000)
-      assertTrue(fetchDataOpt.isEmpty)
+      val fetchResult = fetchAsConsumer(replicaManager, new TopicPartition(topic, 0), new PartitionData(0, 0, 100000),
+        isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 10000)
+      assertFalse(fetchResult.isFired)
 
       timer.advanceClock(1001)
-      assertTrue(fetchDataOpt.isDefined)
+      val fetchData = fetchResult.assertFired
 
-      val fetchData = fetchDataOpt.get
       assertEquals(Errors.NONE, fetchData.error)
+      assertEquals(Some(numRecords + 1), fetchData.lastStableOffset)
       assertEquals(numRecords + 1, fetchData.records.records.asScala.size)
       assertTrue(fetchData.abortedTransactions.isDefined)
       assertEquals(1, fetchData.abortedTransactions.get.size)
@@ -389,84 +364,118 @@ class ReplicaManagerTest {
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
-      def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
-
       // Append a couple of messages.
       for(i <- 1 to 2) {
         val records = TestUtils.singletonRecords(s"message $i".getBytes)
-        appendRecords(rm, Map(new TopicPartition(topic, 0) -> records), produceCallback)
-      }
-
-      var fetchCallbackFired = false
-      var fetchError = Errors.NONE
-      var fetchedRecords: Records = null
-      def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
-        fetchError = responseStatus.map(_._2).head.error
-        fetchedRecords = responseStatus.map(_._2).head.records
-        fetchCallbackFired = true
+        appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response =>
+          assertEquals(Errors.NONE, response.error)
+        }
       }
 
       // Fetch a message above the high watermark as a follower
-      fetchAsFollower(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), fetchCallback)
-      assertTrue(fetchCallbackFired)
-      assertEquals("Should not give an exception", Errors.NONE, fetchError)
-      assertTrue("Should return some data", fetchedRecords.batches.iterator.hasNext)
-      fetchCallbackFired = false
+      val followerFetchResult = fetchAsFollower(rm, new TopicPartition(topic, 0), new PartitionData(1, 0, 100000))
+      val followerFetchData = followerFetchResult.assertFired
+      assertEquals("Should not give an exception", Errors.NONE, followerFetchData.error)
+      assertTrue("Should return some data", followerFetchData.records.batches.iterator.hasNext)
 
       // Fetch a message above the high watermark as a consumer
-      fetchAsConsumer(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), fetchCallback)
-      assertTrue(fetchCallbackFired)
-      assertEquals("Should not give an exception", Errors.NONE, fetchError)
-      assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords)
+      val consumerFetchResult = fetchAsConsumer(rm, new TopicPartition(topic, 0), new PartitionData(1, 0, 100000))
+      val consumerFetchData = consumerFetchResult.assertFired
+      assertEquals("Should not give an exception", Errors.NONE, consumerFetchData.error)
+      assertEquals("Should return empty response", MemoryRecords.EMPTY, consumerFetchData.records)
     } finally {
       rm.shutdown(checkpointHW = false)
     }
   }
 
+  private class CallbackResult[T] {
+    private var value: Option[T] = None
+    private var fun: Option[T => Unit] = None
+
+    def assertFired: T = {
+      assertTrue("Callback has not been fired", isFired)
+      value.get
+    }
+
+    def isFired: Boolean = {
+      value.isDefined
+    }
+
+    def fire(value: T): Unit = {
+      this.value = Some(value)
+      fun.foreach(f => f(value))
+    }
+
+    def onFire(fun: T => Unit): CallbackResult[T] = {
+      this.fun = Some(fun)
+      if (this.isFired) fire(value.get)
+      this
+    }
+  }
+
   private def appendRecords(replicaManager: ReplicaManager,
-                            entriesPerPartition: Map[TopicPartition, MemoryRecords],
-                            responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
-                            isFromClient: Boolean = true): Unit = {
+                            partition: TopicPartition,
+                            records: MemoryRecords,
+                            isFromClient: Boolean = true): CallbackResult[PartitionResponse] = {
+    val result = new CallbackResult[PartitionResponse]()
+    def appendCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = {
+      val response = responses.get(partition)
+      assertTrue(response.isDefined)
+      result.fire(response.get)
+    }
+
     replicaManager.appendRecords(
       timeout = 1000,
       requiredAcks = -1,
       internalTopicsAllowed = false,
       isFromClient = isFromClient,
-      entriesPerPartition = entriesPerPartition,
-      responseCallback = responseCallback)
+      entriesPerPartition = Map(partition -> records),
+      responseCallback = appendCallback)
+
+    result
   }
 
   private def fetchAsConsumer(replicaManager: ReplicaManager,
-                              fetchInfos: Seq[(TopicPartition, PartitionData)],
-                              fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
+                              partition: TopicPartition,
+                              partitionData: PartitionData,
                               minBytes: Int = 0,
-                              isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = {
-    fetchMessages(replicaManager, replicaId = -1, fetchInfos, fetchCallback, minBytes, isolationLevel)
+                              isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = {
+    fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel)
   }
 
   private def fetchAsFollower(replicaManager: ReplicaManager,
-                              fetchInfos: Seq[(TopicPartition, PartitionData)],
-                              fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
+                              partition: TopicPartition,
+                              partitionData: PartitionData,
                               minBytes: Int = 0,
-                              isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = {
-    fetchMessages(replicaManager, replicaId = 1, fetchInfos, fetchCallback, minBytes, isolationLevel)
+                              isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = {
+    fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel)
   }
 
   private def fetchMessages(replicaManager: ReplicaManager,
                             replicaId: Int,
-                            fetchInfos: Seq[(TopicPartition, PartitionData)],
-                            fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
+                            partition: TopicPartition,
+                            partitionData: PartitionData,
                             minBytes: Int,
-                            isolationLevel: IsolationLevel): Unit = {
+                            isolationLevel: IsolationLevel): CallbackResult[FetchPartitionData] = {
+    val result = new CallbackResult[FetchPartitionData]()
+    def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
+      assertEquals(1, responseStatus.size)
+      val (topicPartition, fetchData) = responseStatus.head
+      assertEquals(partition, topicPartition)
+      result.fire(fetchData)
+    }
+
     replicaManager.fetchMessages(
       timeout = 1000,
       replicaId = replicaId,
       fetchMinBytes = minBytes,
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
-      fetchInfos = fetchInfos,
+      fetchInfos = Seq(partition -> partitionData),
       responseCallback = fetchCallback,
       isolationLevel = isolationLevel)
+
+    result
   }
 
   private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer): ReplicaManager = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index dad4b78..72d7fc5 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -124,12 +124,13 @@ class SimpleFetchTest {
     val followerReplica= new Replica(configs(1).brokerId, partition, time)
     val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
     followerReplica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(leo, MemoryRecords.EMPTY),
-                                                          hw = leo.messageOffset,
+                                                          highWatermark = leo.messageOffset,
                                                           leaderLogStartOffset = 0L,
                                                           leaderLogEndOffset = leo.messageOffset,
                                                           followerLogStartOffset = 0L,
                                                           fetchTimeMs = time.milliseconds,
-                                                          readSize = -1))
+                                                          readSize = -1,
+                                                          lastStableOffset = None))
 
     // add both of them to ISR
     val allReplicas = List(leaderReplica, followerReplica)


Mime
View raw message