kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6397: Consumer should not block setting positions of unavailable partitions (#4557)
Date Wed, 14 Feb 2018 18:52:50 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6d18d88  KAFKA-6397: Consumer should not block setting positions of unavailable partitions (#4557)
6d18d88 is described below

commit 6d18d882b850a8d757c32bb124b1e42e60587c69
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Feb 14 10:52:46 2018 -0800

    KAFKA-6397: Consumer should not block setting positions of unavailable partitions (#4557)
    
    Prior to this patch, the consumer always blocks in poll() if there are any partitions which are awaiting their initial positions. This behavior was inconsistent with normal fetch behavior since we allow fetching on available partitions even if one or more of the assigned partitions becomes unavailable _after_ initial offset lookup. With this patch, the consumer will do offset resets asynchronously, which allows other partitions to make progress even if the initial positions for some p [...]
    
    I have added several new unit tests in `FetcherTest` and `KafkaConsumerTest` to verify the new behavior. One minor compatibility implication worth mentioning is apparent from the change I made in `DynamicBrokerReconfigurationTest`. Previously it was possible to assume that all partitions had a fetch position after `poll()` completed with a non-empty assignment. This assumption is no longer generally true, but you can force the positions to be updated using the `position()` API which s [...]
    
    Note that this this patch also removes the logic to cache committed offsets in `SubscriptionState` since it was no longer needed (the consumer's `committed()` API always does an offset lookup anyway). In addition to avoiding the complexity of maintaining the cache, this avoids wasteful offset lookups to refresh the cache when `commitAsync()` is used.
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  80 +--
 .../kafka/clients/consumer/MockConsumer.java       |  34 +-
 .../consumer/internals/ConsumerCoordinator.java    |  32 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 405 ++++++++-------
 .../clients/consumer/internals/RequestFuture.java  |   8 -
 .../consumer/internals/SubscriptionState.java      | 141 +++---
 .../java/org/apache/kafka/clients/MockClient.java  |   8 +
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 552 ++++++++++-----------
 .../kafka/clients/consumer/MockConsumerTest.java   |  10 +-
 .../internals/ConsumerCoordinatorTest.java         | 105 ++--
 .../clients/consumer/internals/FetcherTest.java    | 275 +++++++---
 .../consumer/internals/SubscriptionStateTest.java  |  24 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |  18 +-
 13 files changed, 955 insertions(+), 737 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2f7fd58..cbe54c7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -642,8 +642,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
         this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
-             keyDeserializer,
-             valueDeserializer);
+             keyDeserializer, valueDeserializer);
     }
 
     @SuppressWarnings("unchecked")
@@ -779,6 +778,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     metricsRegistry.fetcherMetrics,
                     this.time,
                     this.retryBackoffMs,
+                    this.requestTimeoutMs,
                     isolationLevel);
 
             config.logUnused();
@@ -1144,12 +1144,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
         client.maybeTriggerWakeup();
-        coordinator.poll(time.milliseconds(), timeout);
 
-        // fetch positions if we have partitions we're subscribed to that we
-        // don't know the offset for
-        if (!subscriptions.hasAllFetchPositions())
-            updateFetchPositions(this.subscriptions.missingFetchPositions());
+        long startMs = time.milliseconds();
+        coordinator.poll(startMs, timeout);
+
+        // Lookup positions of assigned partitions
+        boolean hasAllFetchPositions = updateFetchPositions();
 
         // if data is available already, return it immediately
         Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
@@ -1159,10 +1159,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         // send any new fetches (won't resend pending fetches)
         fetcher.sendFetches();
 
-        long now = time.milliseconds();
-        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
+        long nowMs = time.milliseconds();
+        long remainingTimeMs = Math.max(0, timeout - (nowMs - startMs));
+        long pollTimeout = Math.min(coordinator.timeToNextPoll(nowMs), remainingTimeMs);
+
+        // We do not want to be stuck blocking in poll if we are missing some positions
+        // since the offset lookup may be backing off after a failure
+        if (!hasAllFetchPositions && pollTimeout > retryBackoffMs)
+            pollTimeout = retryBackoffMs;
 
-        client.poll(pollTimeout, now, new PollCondition() {
+        client.poll(pollTimeout, nowMs, new PollCondition() {
             @Override
             public boolean shouldBlock() {
                 // since a fetch might be completed by the background thread, we need this poll condition
@@ -1359,7 +1365,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
             for (TopicPartition tp : parts) {
                 log.debug("Seeking to beginning of partition {}", tp);
-                subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+                subscriptions.requestOffsetReset(tp, OffsetResetStrategy.EARLIEST);
             }
         } finally {
             release();
@@ -1385,7 +1391,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
             for (TopicPartition tp : parts) {
                 log.debug("Seeking to end of partition {}", tp);
-                subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+                subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
             }
         } finally {
             release();
@@ -1400,7 +1406,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * encountered (in which case it is thrown to the caller).
      *
      * @param partition The partition to get the position for
-     * @return The offset
+     * @return The current position of the consumer (that is, the offset of the next record to be fetched)
      * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
      * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
      *             the partition
@@ -1419,9 +1425,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             if (!this.subscriptions.isAssigned(partition))
                 throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
             Long offset = this.subscriptions.position(partition);
-            if (offset == null) {
+            while (offset == null) {
                 // batch update fetch positions for any partitions without a valid position
-                updateFetchPositions(subscriptions.assignedPartitions());
+                updateFetchPositions();
+                client.poll(retryBackoffMs);
                 offset = this.subscriptions.position(partition);
             }
             return offset;
@@ -1611,7 +1618,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                             entry.getValue() + ". The target time cannot be negative.");
             }
-            return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
+            return fetcher.offsetsByTimes(timestampsToSearch, requestTimeoutMs);
         } finally {
             release();
         }
@@ -1672,7 +1679,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         } finally {
             release();
         }
-
     }
 
     /**
@@ -1770,28 +1776,32 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * Set the fetch position to the committed position (if there is one)
      * or reset it using the offset reset policy the user has configured.
      *
-     * @param partitions The partitions that needs updating fetch positions
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
      * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
      *             defined
+     * @return true if all assigned positions have a position, false otherwise
      */
-    private void updateFetchPositions(Set<TopicPartition> partitions) {
-        // lookup any positions for partitions which are awaiting reset (which may be the
-        // case if the user called seekToBeginning or seekToEnd. We do this check first to
-        // avoid an unnecessary lookup of committed offsets (which typically occurs when
-        // the user is manually assigning partitions and managing their own offsets).
-        fetcher.resetOffsetsIfNeeded(partitions);
-
-        if (!subscriptions.hasAllFetchPositions(partitions)) {
-            // if we still don't have offsets for the given partitions, then we should either
-            // seek to the last committed position or reset using the auto reset policy
-
-            // first refresh commits for all assigned partitions
-            coordinator.refreshCommittedOffsetsIfNeeded();
-
-            // then do any offset lookups in case some positions are not known
-            fetcher.updateFetchPositions(partitions);
-        }
+    private boolean updateFetchPositions() {
+        if (subscriptions.hasAllFetchPositions())
+            return true;
+
+        // If there are any partitions which do not have a valid position and are not
+        // awaiting reset, then we need to fetch committed offsets. We will only do a
+        // coordinator lookup if there are partitions which have missing positions, so
+        // a consumer with manually assigned partitions can avoid a coordinator dependence
+        // by always ensuring that assigned partitions have an initial position.
+        coordinator.refreshCommittedOffsetsIfNeeded();
+
+        // If there are partitions still needing a position and a reset policy is defined,
+        // request reset using the default policy. If no reset strategy is defined and there
+        // are partitions with a missing position, then we will raise an exception.
+        subscriptions.resetMissingPositions();
+
+        // Finally send an asynchronous request to lookup and update the positions of any
+        // partitions which are awaiting reset.
+        fetcher.resetOffsetsIfNeeded();
+
+        return false;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index d9085b0..c492995 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -50,16 +50,16 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     private final Map<String, List<PartitionInfo>> partitions;
     private final SubscriptionState subscriptions;
-    private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
-    private Set<TopicPartition> paused;
-    private boolean closed;
     private final Map<TopicPartition, Long> beginningOffsets;
     private final Map<TopicPartition, Long> endOffsets;
+    private final Map<TopicPartition, OffsetAndMetadata> committed;
+    private final Queue<Runnable> pollTasks;
+    private final Set<TopicPartition> paused;
 
-    private Queue<Runnable> pollTasks;
+    private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
     private KafkaException exception;
-
     private AtomicBoolean wakeup;
+    private boolean closed;
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
         this.subscriptions = new SubscriptionState(offsetResetStrategy);
@@ -72,6 +72,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.pollTasks = new LinkedList<>();
         this.exception = null;
         this.wakeup = new AtomicBoolean(false);
+        this.committed = new HashMap<>();
     }
 
     @Override
@@ -99,6 +100,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     @Override
     public synchronized void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
+        committed.clear();
         this.subscriptions.subscribe(pattern, listener);
         Set<String> topicsToSubscribe = new HashSet<>();
         for (String topic: partitions.keySet()) {
@@ -126,18 +128,21 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     @Override
     public synchronized void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
+        committed.clear();
         this.subscriptions.subscribe(new HashSet<>(topics), listener);
     }
 
     @Override
     public synchronized void assign(Collection<TopicPartition> partitions) {
         ensureNotClosed();
+        committed.clear();
         this.subscriptions.assignFromUser(new HashSet<>(partitions));
     }
 
     @Override
     public synchronized void unsubscribe() {
         ensureNotClosed();
+        committed.clear();
         subscriptions.unsubscribe();
     }
 
@@ -165,8 +170,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         }
 
         // Handle seeks that need to wait for a poll() call to be processed
-        for (TopicPartition tp : subscriptions.missingFetchPositions())
-            updateFetchPosition(tp);
+        for (TopicPartition tp : subscriptions.assignedPartitions())
+            if (!subscriptions.hasValidPosition(tp))
+                updateFetchPosition(tp);
 
         // update the consumed offset
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>();
@@ -211,7 +217,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
         ensureNotClosed();
         for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet())
-            subscriptions.committed(entry.getKey(), entry.getValue());
+            committed.put(entry.getKey(), entry.getValue());
         if (callback != null) {
             callback.onComplete(offsets, null);
         }
@@ -248,7 +254,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     public synchronized OffsetAndMetadata committed(TopicPartition partition) {
         ensureNotClosed();
         if (subscriptions.isAssigned(partition)) {
-            return subscriptions.committed(partition);
+            return committed.get(partition);
         }
         return new OffsetAndMetadata(0);
     }
@@ -270,7 +276,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
         ensureNotClosed();
         for (TopicPartition tp : partitions)
-            subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+            subscriptions.requestOffsetReset(tp, OffsetResetStrategy.EARLIEST);
     }
 
     public synchronized void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) {
@@ -281,7 +287,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
         ensureNotClosed();
         for (TopicPartition tp : partitions)
-            subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+            subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
     }
 
     public synchronized void updateEndOffsets(Map<TopicPartition, Long> newOffsets) {
@@ -408,11 +414,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private void updateFetchPosition(TopicPartition tp) {
         if (subscriptions.isOffsetResetNeeded(tp)) {
             resetOffsetPosition(tp);
-        } else if (subscriptions.committed(tp) == null) {
-            subscriptions.needOffsetReset(tp);
+        } else if (!committed.containsKey(tp)) {
+            subscriptions.requestOffsetReset(tp);
             resetOffsetPosition(tp);
         } else {
-            subscriptions.seek(tp, subscriptions.committed(tp).offset());
+            subscriptions.seek(tp, committed.get(tp).offset());
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index d7c1ce9..0aa61bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -217,11 +217,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 
         Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
-
-        // set the flag to refresh last committed offsets
-        subscriptions.needRefreshCommits();
-
-        // update partition assignment
         subscriptions.assignFromSubscribed(assignment.partitions());
 
         // check if the assignment contains some topics that were not in the original
@@ -257,7 +252,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
 
         // execute the user's callback after rebalance
-        ConsumerRebalanceListener listener = subscriptions.listener();
+        ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
         log.info("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
         try {
             Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
@@ -412,7 +407,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         maybeAutoCommitOffsetsSync(rebalanceTimeoutMs);
 
         // execute the user's callback before rebalance
-        ConsumerRebalanceListener listener = subscriptions.listener();
+        ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
         log.info("Revoking previously assigned partitions {}", subscriptions.assignedPartitions());
         try {
             Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
@@ -447,15 +442,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * Refresh the committed offsets for provided partitions.
      */
     public void refreshCommittedOffsetsIfNeeded() {
-        if (subscriptions.refreshCommitsNeeded()) {
-            Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
-            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
-                TopicPartition tp = entry.getKey();
-                // verify assignment is still active
-                if (subscriptions.isAssigned(tp))
-                    this.subscriptions.committed(tp, entry.getValue());
-            }
-            this.subscriptions.commitsRefreshed();
+        Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();
+        Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions);
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            long offset = entry.getValue().offset();
+            log.debug("Setting offset for partition {} to the committed offset {}", tp, offset);
+            this.subscriptions.seek(tp, offset);
         }
     }
 
@@ -465,6 +458,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * @return A map from partition to the committed offset
      */
     public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
+        if (partitions.isEmpty())
+            return Collections.emptyMap();
+
         while (true) {
             ensureCoordinatorReady();
 
@@ -547,7 +543,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
-        this.subscriptions.needRefreshCommits();
         RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
         final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
         future.addListener(new RequestFutureListener<Void>() {
@@ -747,9 +742,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 Errors error = entry.getValue();
                 if (error == Errors.NONE) {
                     log.debug("Committed offset {} for partition {}", offset, tp);
-                    if (subscriptions.isAssigned(tp))
-                        // update the local cache only if the partition is still assigned
-                        subscriptions.committed(tp, offsetAndMetadata);
                 } else {
                     log.error("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 0711d15..ac199bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.FetchSessionHandler;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -30,7 +29,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.RetriableException;
@@ -85,6 +83,7 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.emptyList;
 import static org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
@@ -102,6 +101,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private final int maxWaitMs;
     private final int fetchSize;
     private final long retryBackoffMs;
+    private final long requestTimeoutMs;
     private final int maxPollRecords;
     private final boolean checkCrcs;
     private final Metadata metadata;
@@ -109,11 +109,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private final SubscriptionState subscriptions;
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
     private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
-
     private final ExtendedDeserializer<K> keyDeserializer;
     private final ExtendedDeserializer<V> valueDeserializer;
     private final IsolationLevel isolationLevel;
     private final Map<Integer, FetchSessionHandler> sessionHandlers;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
 
     private PartitionRecords nextInLineRecords = null;
 
@@ -133,6 +133,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                    FetcherMetricsRegistry metricsRegistry,
                    Time time,
                    long retryBackoffMs,
+                   long requestTimeoutMs,
                    IsolationLevel isolationLevel) {
         this.log = logContext.logger(Fetcher.class);
         this.logContext = logContext;
@@ -151,6 +152,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         this.completedFetches = new ConcurrentLinkedQueue<>();
         this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
         this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
         this.isolationLevel = isolationLevel;
         this.sessionHandlers = new HashMap<>();
 
@@ -161,17 +163,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
      * Represents data about an offset returned by a broker.
      */
     private static class OffsetData {
-        /**
-         * The offset
-         */
         final long offset;
-
-        /**
-         * The timestamp.
-         *
-         * Will be null if the broker does not support returning timestamps.
-         */
-        final Long timestamp;
+        final Long timestamp; //  null if the broker does not support returning timestamps
 
         OffsetData(long offset, Long timestamp) {
             this.offset = offset;
@@ -197,12 +190,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
             final Node fetchTarget = entry.getKey();
             final FetchSessionHandler.FetchRequestData data = entry.getValue();
-            final FetchRequest.Builder request = FetchRequest.Builder.
-                forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
-                .isolationLevel(isolationLevel)
-                .setMaxBytes(this.maxBytes)
-                .metadata(data.metadata())
-                .toForget(data.toForget());
+            final FetchRequest.Builder request = FetchRequest.Builder
+                    .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
+                    .isolationLevel(isolationLevel)
+                    .setMaxBytes(this.maxBytes)
+                    .metadata(data.metadata())
+                    .toForget(data.toForget());
             if (log.isDebugEnabled()) {
                 log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
             }
@@ -251,51 +244,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     }
 
     /**
-     * Lookup and set offsets for any partitions which are awaiting an explicit reset.
-     * @param partitions the partitions to reset
-     */
-    public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
-        final Set<TopicPartition> needsOffsetReset = new HashSet<>();
-        for (TopicPartition tp : partitions) {
-            if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp))
-                needsOffsetReset.add(tp);
-        }
-        if (!needsOffsetReset.isEmpty()) {
-            resetOffsets(needsOffsetReset);
-        }
-    }
-
-    /**
-     * Update the fetch positions for the provided partitions.
-     * @param partitions the partitions to update positions for
-     * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available
-     */
-    public void updateFetchPositions(Set<TopicPartition> partitions) {
-        final Set<TopicPartition> needsOffsetReset = new HashSet<>();
-        // reset the fetch position to the committed position
-        for (TopicPartition tp : partitions) {
-            if (!subscriptions.isAssigned(tp) || subscriptions.hasValidPosition(tp))
-                continue;
-
-            if (subscriptions.isOffsetResetNeeded(tp)) {
-                needsOffsetReset.add(tp);
-            } else if (subscriptions.committed(tp) == null) {
-                // there's no committed position, so we need to reset with the default strategy
-                subscriptions.needOffsetReset(tp);
-                needsOffsetReset.add(tp);
-            } else {
-                long committed = subscriptions.committed(tp).offset();
-                log.debug("Resetting offset for partition {} to the committed offset {}", tp, committed);
-                subscriptions.seek(tp, committed);
-            }
-        }
-
-        if (!needsOffsetReset.isEmpty()) {
-            resetOffsets(needsOffsetReset);
-        }
-    }
-
-    /**
      * Get topic metadata for all topics in the cluster
      * @param timeout time for which getting topic metadata is attempted
      * @return The map of topics with their partition information
@@ -393,93 +341,96 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             return client.send(node, request);
     }
 
-    private void offsetResetStrategyTimestamp(
-            final TopicPartition partition,
-            final Map<TopicPartition, Long> output,
-            final Set<TopicPartition> partitionsWithNoOffsets) {
+    private Long offsetResetStrategyTimestamp(final TopicPartition partition) {
         OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
         if (strategy == OffsetResetStrategy.EARLIEST)
-            output.put(partition, ListOffsetRequest.EARLIEST_TIMESTAMP);
+            return ListOffsetRequest.EARLIEST_TIMESTAMP;
         else if (strategy == OffsetResetStrategy.LATEST)
-            output.put(partition, endTimestamp());
+            return ListOffsetRequest.LATEST_TIMESTAMP;
         else
-            partitionsWithNoOffsets.add(partition);
+            return null;
     }
 
     /**
-     * Reset offsets for the given partition using the offset reset strategy.
+     * Reset offsets for all assigned partitions that require it.
      *
-     * @param partitions  The partitions that need offsets reset
      * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+     *   and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
      */
-    private void resetOffsets(final Set<TopicPartition> partitions) {
-        final Map<TopicPartition, Long> offsetResets = new HashMap<>();
-        final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
-        for (final TopicPartition partition : partitions) {
-            offsetResetStrategyTimestamp(partition, offsetResets, partitionsWithNoOffsets);
-        }
-        final Map<TopicPartition, OffsetData> offsetsByTimes = retrieveOffsetsByTimes(offsetResets, Long.MAX_VALUE, false);
+    public void resetOffsetsIfNeeded() {
+        // Raise exception from previous offset fetch if there is one
+        RuntimeException exception = cachedListOffsetsException.getAndSet(null);
+        if (exception != null)
+            throw exception;
+
+        Set<TopicPartition> partitions = subscriptions.partitionsNeedingReset(time.milliseconds());
+        if (partitions.isEmpty())
+            return;
+
+        final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
         for (final TopicPartition partition : partitions) {
-            final OffsetData offsetData = offsetsByTimes.get(partition);
-            if (offsetData == null) {
-                partitionsWithNoOffsets.add(partition);
-                continue;
-            }
-            // we might lose the assignment while fetching the offset, so check it is still active
-            if (subscriptions.isAssigned(partition)) {
-                log.debug("Resetting offset for partition {} to offset {}.", partition, offsetData.offset);
-                this.subscriptions.seek(partition, offsetData.offset);
-            }
-        }
-        if (!partitionsWithNoOffsets.isEmpty()) {
-            throw new NoOffsetForPartitionException(partitionsWithNoOffsets);
+            Long timestamp = offsetResetStrategyTimestamp(partition);
+            if (timestamp != null)
+                offsetResetTimestamps.put(partition, timestamp);
         }
+
+        resetOffsetsAsync(offsetResetTimestamps);
     }
 
-    public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
-                                                                     long timeout) {
-        Map<TopicPartition, OffsetData> offsetData = retrieveOffsetsByTimes(timestampsToSearch, timeout, true);
+    public Map<TopicPartition, OffsetAndTimestamp> offsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
+                                                                  long timeout) {
+        Map<TopicPartition, OffsetData> fetchedOffsets = fetchOffsetsByTimes(timestampsToSearch,
+                timeout, true).fetchedOffsets;
 
         HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(timestampsToSearch.size());
         for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet())
             offsetsByTimes.put(entry.getKey(), null);
 
-        for (Map.Entry<TopicPartition, OffsetData> entry : offsetData.entrySet()) {
+        for (Map.Entry<TopicPartition, OffsetData> entry : fetchedOffsets.entrySet()) {
             // 'entry.getValue().timestamp' will not be null since we are guaranteed
             // to work with a v1 (or later) ListOffset request
-            offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(entry.getValue().offset, entry.getValue().timestamp));
+            OffsetData offsetData = entry.getValue();
+            offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(offsetData.offset, offsetData.timestamp));
         }
 
         return offsetsByTimes;
     }
 
-    private Map<TopicPartition, OffsetData> retrieveOffsetsByTimes(
-            Map<TopicPartition, Long> timestampsToSearch, long timeout, boolean requireTimestamps) {
+    private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
+                                                 long timeout,
+                                                 boolean requireTimestamps) {
+        ListOffsetResult result = new ListOffsetResult();
         if (timestampsToSearch.isEmpty())
-            return Collections.emptyMap();
+            return result;
+
+        Map<TopicPartition, Long> remainingToSearch = new HashMap<>(timestampsToSearch);
 
         long startMs = time.milliseconds();
         long remaining = timeout;
         do {
-            RequestFuture<Map<TopicPartition, OffsetData>> future =
-                    sendListOffsetRequests(requireTimestamps, timestampsToSearch);
+            RequestFuture<ListOffsetResult> future = sendListOffsetsRequests(remainingToSearch, requireTimestamps);
             client.poll(future, remaining);
 
             if (!future.isDone())
                 break;
 
-            if (future.succeeded())
-                return future.value();
+            if (future.succeeded()) {
+                ListOffsetResult value = future.value();
+                result.fetchedOffsets.putAll(value.fetchedOffsets);
+                if (value.partitionsToRetry.isEmpty())
+                    return result;
 
-            if (!future.isRetriable())
+                remainingToSearch.keySet().removeAll(result.fetchedOffsets.keySet());
+            } else if (!future.isRetriable()) {
                 throw future.exception();
+            }
 
             long elapsed = time.milliseconds() - startMs;
             remaining = timeout - elapsed;
             if (remaining <= 0)
                 break;
 
-            if (future.exception() instanceof InvalidMetadataException)
+            if (metadata.updateRequested())
                 client.awaitMetadataUpdate(remaining);
             else
                 time.sleep(Math.min(remaining, retryBackoffMs));
@@ -487,7 +438,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             elapsed = time.milliseconds() - startMs;
             remaining = timeout - elapsed;
         } while (remaining > 0);
-        throw new TimeoutException("Failed to get offsets by times in " + timeout + " ms");
+
+        throw new TimeoutException("Failed to get offsets by times in " + timeout + "ms");
     }
 
     public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, long timeout) {
@@ -495,11 +447,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     }
 
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, long timeout) {
-        return beginningOrEndOffset(partitions, endTimestamp(), timeout);
-    }
-
-    private long endTimestamp() {
-        return ListOffsetRequest.LATEST_TIMESTAMP;
+        return beginningOrEndOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP, timeout);
     }
 
     private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions,
@@ -508,12 +456,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
         for (TopicPartition tp : partitions)
             timestampsToSearch.put(tp, timestamp);
-        Map<TopicPartition, Long> result = new HashMap<>();
-        for (Map.Entry<TopicPartition, OffsetData> entry :
-                retrieveOffsetsByTimes(timestampsToSearch, timeout, false).entrySet()) {
-            result.put(entry.getKey(), entry.getValue().offset);
+        Map<TopicPartition, Long> offsets = new HashMap<>();
+        ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch, timeout, false);
+        for (Map.Entry<TopicPartition, OffsetData> entry : result.fetchedOffsets.entrySet()) {
+            offsets.put(entry.getKey(), entry.getValue().offset);
         }
-        return result;
+        return offsets;
     }
 
     /**
@@ -569,14 +517,14 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
             log.debug("Not returning fetched records for partition {} since it is no longer assigned",
                     partitionRecords.partition);
+        } else if (!subscriptions.isFetchable(partitionRecords.partition)) {
+            // this can happen when a partition is paused before fetched records are returned to the consumer's
+            // poll call or if the offset is being reset
+            log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
+                    partitionRecords.partition);
         } else {
-            // note that the consumed position should always be available as long as the partition is still assigned
             long position = subscriptions.position(partitionRecords.partition);
-            if (!subscriptions.isFetchable(partitionRecords.partition)) {
-                // this can happen when a partition is paused before fetched records are returned to the consumer's poll call
-                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
-                        partitionRecords.partition);
-            } else if (partitionRecords.nextFetchOffset == position) {
+            if (partitionRecords.nextFetchOffset == position) {
                 List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
 
                 long nextOffset = partitionRecords.nextFetchOffset;
@@ -606,32 +554,124 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         return emptyList();
     }
 
+    private void resetOffsetIfNeeded(TopicPartition partition, Long requestedResetTimestamp, OffsetData offsetData) {
+        // we might lose the assignment while fetching the offset, or the user might seek to a different offset,
+        // so verify it is still assigned and still in need of the requested reset
+        if (!subscriptions.isAssigned(partition)) {
+            log.debug("Skipping reset of partition {} since it is no longer assigned", partition);
+        } else if (!subscriptions.isOffsetResetNeeded(partition)) {
+            log.debug("Skipping reset of partition {} since reset is no longer needed", partition);
+        } else if (!requestedResetTimestamp.equals(offsetResetStrategyTimestamp(partition))) {
+            log.debug("Skipping reset of partition {} since an alternative reset has been requested", partition);
+        } else {
+            log.info("Resetting offset for partition {} to offset {}.", partition, offsetData.offset);
+            subscriptions.seek(partition, offsetData.offset);
+        }
+    }
+
+    private void resetOffsetsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
+        // Add the topics to the metadata to do a single metadata fetch.
+        for (TopicPartition tp : partitionResetTimestamps.keySet())
+            metadata.add(tp.topic());
+
+        Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = groupListOffsetRequests(partitionResetTimestamps);
+        for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
+            final Map<TopicPartition, Long> resetTimestamps = entry.getValue();
+            subscriptions.setResetPending(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs);
+
+            RequestFuture<ListOffsetResult> future = sendListOffsetRequest(entry.getKey(), resetTimestamps, false);
+            future.addListener(new RequestFutureListener<ListOffsetResult>() {
+                @Override
+                public void onSuccess(ListOffsetResult result) {
+                    if (!result.partitionsToRetry.isEmpty()) {
+                        subscriptions.resetFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs);
+                        metadata.requestUpdate();
+                    }
+
+                    for (Map.Entry<TopicPartition, OffsetData> fetchedOffset : result.fetchedOffsets.entrySet()) {
+                        TopicPartition partition = fetchedOffset.getKey();
+                        OffsetData offsetData = fetchedOffset.getValue();
+                        Long requestedResetTimestamp = resetTimestamps.get(partition);
+                        resetOffsetIfNeeded(partition, requestedResetTimestamp, offsetData);
+                    }
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    subscriptions.resetFailed(resetTimestamps.keySet(), time.milliseconds() + retryBackoffMs);
+                    metadata.requestUpdate();
+
+                    if (!(e instanceof RetriableException) && !cachedListOffsetsException.compareAndSet(null, e))
+                        log.error("Discarding error in ListOffsetResponse because another error is pending", e);
+                }
+            });
+        }
+    }
+
     /**
      * Search the offsets by target times for the specified partitions.
      *
+     * @param timestampsToSearch the mapping between partitions and target time
      * @param requireTimestamps true if we should fail with an UnsupportedVersionException if the broker does
      *                         not support fetching precise timestamps for offsets
-     * @param timestampsToSearch the mapping between partitions and target time
      * @return A response which can be polled to obtain the corresponding timestamps and offsets.
      */
-    private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequests(
-            final boolean requireTimestamps,
-            final Map<TopicPartition, Long> timestampsToSearch) {
-        // Group the partitions by node.
-        final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
+    private RequestFuture<ListOffsetResult> sendListOffsetsRequests(final Map<TopicPartition, Long> timestampsToSearch,
+                                                                    final boolean requireTimestamps) {
         // Add the topics to the metadata to do a single metadata fetch.
         for (TopicPartition tp : timestampsToSearch.keySet())
             metadata.add(tp.topic());
 
+        Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = groupListOffsetRequests(timestampsToSearch);
+        if (timestampsToSearchByNode.isEmpty())
+            return RequestFuture.failure(new StaleMetadataException());
+
+        final RequestFuture<ListOffsetResult> listOffsetRequestsFuture = new RequestFuture<>();
+        final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>();
+        final Set<TopicPartition> partitionsToRetry = new HashSet<>();
+        final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
+
+        for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
+            RequestFuture<ListOffsetResult> future =
+                    sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
+            future.addListener(new RequestFutureListener<ListOffsetResult>() {
+                @Override
+                public void onSuccess(ListOffsetResult partialResult) {
+                    synchronized (listOffsetRequestsFuture) {
+                        fetchedTimestampOffsets.putAll(partialResult.fetchedOffsets);
+                        partitionsToRetry.addAll(partialResult.partitionsToRetry);
+
+                        if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone()) {
+                            ListOffsetResult result = new ListOffsetResult(fetchedTimestampOffsets, partitionsToRetry);
+                            listOffsetRequestsFuture.complete(result);
+                        }
+                    }
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    synchronized (listOffsetRequestsFuture) {
+                        if (!listOffsetRequestsFuture.isDone())
+                            listOffsetRequestsFuture.raise(e);
+                    }
+                }
+            });
+        }
+        return listOffsetRequestsFuture;
+    }
+
+    private Map<Node, Map<TopicPartition, Long>> groupListOffsetRequests(Map<TopicPartition, Long> timestampsToSearch) {
+        final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
         for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
             TopicPartition tp  = entry.getKey();
             PartitionInfo info = metadata.fetch().partition(tp);
             if (info == null) {
-                log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", tp);
-                return RequestFuture.staleMetadata();
+                metadata.add(tp.topic());
+                log.debug("Partition {} is unknown for fetching offset", tp);
+                metadata.requestUpdate();
             } else if (info.leader() == null) {
-                log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", tp);
-                return RequestFuture.leaderNotAvailable();
+                log.debug("Leader for partition {} unavailable for fetching offset", tp);
+                metadata.requestUpdate();
             } else {
                 Node node = info.leader();
                 Map<TopicPartition, Long> topicData = timestampsToSearchByNode.get(node);
@@ -642,33 +682,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 topicData.put(entry.getKey(), entry.getValue());
             }
         }
-
-        final RequestFuture<Map<TopicPartition, OffsetData>> listOffsetRequestsFuture = new RequestFuture<>();
-        final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>();
-        final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
-        for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
-            sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps)
-                    .addListener(new RequestFutureListener<Map<TopicPartition, OffsetData>>() {
-                        @Override
-                        public void onSuccess(Map<TopicPartition, OffsetData> value) {
-                            synchronized (listOffsetRequestsFuture) {
-                                fetchedTimestampOffsets.putAll(value);
-                                if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone())
-                                    listOffsetRequestsFuture.complete(fetchedTimestampOffsets);
-                            }
-                        }
-
-                        @Override
-                        public void onFailure(RuntimeException e) {
-                            synchronized (listOffsetRequestsFuture) {
-                                // This may cause all the requests to be retried, but should be rare.
-                                if (!listOffsetRequestsFuture.isDone())
-                                    listOffsetRequestsFuture.raise(e);
-                            }
-                        }
-                    });
-        }
-        return listOffsetRequestsFuture;
+        return timestampsToSearchByNode;
     }
 
     /**
@@ -679,18 +693,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
      * @param requireTimestamp  True if we require a timestamp in the response.
      * @return A response which can be polled to obtain the corresponding timestamps and offsets.
      */
-    private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequest(final Node node,
-                                                                                 final Map<TopicPartition, Long> timestampsToSearch,
-                                                                                 boolean requireTimestamp) {
+    private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node,
+                                                                  final Map<TopicPartition, Long> timestampsToSearch,
+                                                                  boolean requireTimestamp) {
         ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
                 .forConsumer(requireTimestamp, isolationLevel)
                 .setTargetTimes(timestampsToSearch);
 
-        log.trace("Sending ListOffsetRequest {} to broker {}", builder, node);
+        log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
         return client.send(node, builder)
-                .compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetData>>() {
+                .compose(new RequestFutureAdapter<ClientResponse, ListOffsetResult>() {
                     @Override
-                    public void onSuccess(ClientResponse response, RequestFuture<Map<TopicPartition, OffsetData>> future) {
+                    public void onSuccess(ClientResponse response, RequestFuture<ListOffsetResult> future) {
                         ListOffsetResponse lor = (ListOffsetResponse) response.responseBody();
                         log.trace("Received ListOffsetResponse {} from broker {}", lor, node);
                         handleListOffsetResponse(timestampsToSearch, lor, future);
@@ -712,8 +726,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     @SuppressWarnings("deprecation")
     private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch,
                                           ListOffsetResponse listOffsetResponse,
-                                          RequestFuture<Map<TopicPartition, OffsetData>> future) {
-        Map<TopicPartition, OffsetData> timestampOffsetMap = new HashMap<>();
+                                          RequestFuture<ListOffsetResult> future) {
+        Map<TopicPartition, OffsetData> fetchedOffsets = new HashMap<>();
+        Set<TopicPartition> partitionsToRetry = new HashSet<>();
+        Set<String> unauthorizedTopics = new HashSet<>();
+
         for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
             TopicPartition topicPartition = entry.getKey();
             ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition);
@@ -735,7 +752,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                             topicPartition, offset);
                     if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
                         OffsetData offsetData = new OffsetData(offset, null);
-                        timestampOffsetMap.put(topicPartition, offsetData);
+                        fetchedOffsets.put(topicPartition, offsetData);
                     }
                 } else {
                     // Handle v1 and later response
@@ -743,31 +760,49 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                             topicPartition, partitionData.offset, partitionData.timestamp);
                     if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
                         OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
-                        timestampOffsetMap.put(topicPartition, offsetData);
+                        fetchedOffsets.put(topicPartition, offsetData);
                     }
                 }
             } else if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
-                // The message format on the broker side is before 0.10.0, we simply put null in the response.
+                // The message format on the broker side is before 0.10.0, which means it does not
+                // support timestamps. We treat this case the same as if we weren't able to find an
+                // offset corresponding to the requested timestamp and leave it out of the result.
                 log.debug("Cannot search by timestamp for partition {} because the message format version " +
                         "is before 0.10.0", topicPartition);
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
                 log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
                         topicPartition);
-                future.raise(error);
-                return;
+                partitionsToRetry.add(topicPartition);
             } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                log.warn("Received unknown topic or partition error in ListOffset request for partition {}. The topic/partition " +
-                        "may not exist or the user may not have Describe access to it.", topicPartition);
-                future.raise(error);
-                return;
+                log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
+                partitionsToRetry.add(topicPartition);
+            } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                unauthorizedTopics.add(topicPartition.topic());
             } else {
                 log.warn("Attempt to fetch offsets for partition {} failed due to: {}", topicPartition, error.message());
-                future.raise(new StaleMetadataException());
-                return;
+                partitionsToRetry.add(topicPartition);
             }
         }
-        if (!future.isDone())
-            future.complete(timestampOffsetMap);
+
+        if (!unauthorizedTopics.isEmpty())
+            future.raise(new TopicAuthorizationException(unauthorizedTopics));
+        else
+            future.complete(new ListOffsetResult(fetchedOffsets, partitionsToRetry));
+    }
+
+    private static class ListOffsetResult {
+        private final Map<TopicPartition, OffsetData> fetchedOffsets;
+        private final Set<TopicPartition> partitionsToRetry;
+
+        public ListOffsetResult(Map<TopicPartition, OffsetData> fetchedOffsets, Set<TopicPartition> partitionsNeedingRetry) {
+            this.fetchedOffsets = fetchedOffsets;
+            this.partitionsToRetry = partitionsNeedingRetry;
+        }
+
+        public ListOffsetResult() {
+            this.fetchedOffsets = new HashMap<>();
+            this.partitionsToRetry = new HashSet<>();
+        }
     }
 
     private List<TopicPartition> fetchablePartitions() {
@@ -891,8 +926,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
                 this.metadata.requestUpdate();
             } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                log.warn("Received unknown topic or partition error in fetch for partition {}. The topic/partition " +
-                        "may not exist or the user may not have Describe access to it", tp);
+                log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
                 this.metadata.requestUpdate();
             } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
                 if (fetchOffset != subscriptions.position(tp)) {
@@ -900,7 +934,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                             "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
                 } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
                     log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
-                    subscriptions.needOffsetReset(tp);
+                    subscriptions.requestOffsetReset(tp);
                 } else {
                     throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
                 }
@@ -925,7 +959,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         return partitionRecords;
     }
 
-
     /**
      * Parse the record entry, deserializing the key / value fields if necessary
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index f7e8ca1..52733ae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -242,18 +242,10 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
         return failure(Errors.COORDINATOR_NOT_AVAILABLE.exception());
     }
 
-    public static <T> RequestFuture<T> leaderNotAvailable() {
-        return failure(Errors.LEADER_NOT_AVAILABLE.exception());
-    }
-
     public static <T> RequestFuture<T> noBrokersAvailable() {
         return failure(new NoAvailableBrokersException());
     }
 
-    public static <T> RequestFuture<T> staleMetadata() {
-        return failure(new StaleMetadataException());
-    }
-
     @Override
     public boolean shouldBlock() {
         return !isDone();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 8228707..e289734 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
@@ -48,10 +49,6 @@ import java.util.regex.Pattern;
  *
  * Note that pause state as well as fetch/consumed positions are not preserved when partition
  * assignment is changed whether directly by the user or through a group rebalance.
- *
- * This class also maintains a cache of the latest commit position for each of the assigned
- * partitions. This is updated through {@link #committed(TopicPartition, OffsetAndMetadata)} and can be used
- * to set the initial fetch position (e.g. {@link Fetcher#resetOffsets(Set)}.
  */
 public class SubscriptionState {
     private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
@@ -76,24 +73,20 @@ public class SubscriptionState {
     /* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */
     private final PartitionStates<TopicPartitionState> assignment;
 
-    /* do we need to request the latest committed offsets from the coordinator? */
-    private boolean needsFetchCommittedOffsets;
-
     /* Default offset reset strategy */
     private final OffsetResetStrategy defaultResetStrategy;
 
-    /* User-provided listener to be invoked when assignment changes */
-    private ConsumerRebalanceListener listener;
-
     /* Listeners provide a hook for internal state cleanup (e.g. metrics) on assignment changes */
-    private List<Listener> listeners = new ArrayList<>();
+    private final List<Listener> listeners = new ArrayList<>();
+
+    /* User-provided listener to be invoked when assignment changes */
+    private ConsumerRebalanceListener rebalanceListener;
 
     public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
         this.defaultResetStrategy = defaultResetStrategy;
         this.subscription = Collections.emptySet();
         this.assignment = new PartitionStates<>();
         this.groupSubscription = new HashSet<>();
-        this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
         this.subscribedPattern = null;
         this.subscriptionType = SubscriptionType.NONE;
     }
@@ -117,7 +110,7 @@ public class SubscriptionState {
 
         setSubscriptionType(SubscriptionType.AUTO_TOPICS);
 
-        this.listener = listener;
+        this.rebalanceListener = listener;
 
         changeSubscription(topics);
     }
@@ -174,7 +167,6 @@ public class SubscriptionState {
                 partitionToState.put(partition, state);
             }
             this.assignment.set(partitionToState);
-            this.needsFetchCommittedOffsets = true;
         }
     }
 
@@ -200,9 +192,7 @@ public class SubscriptionState {
                     throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic; subscription is " + this.subscription);
         }
 
-        // after rebalancing, we always reinitialize the assignment value
         this.assignment.set(assignedPartitionStates);
-        this.needsFetchCommittedOffsets = true;
     }
 
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
@@ -211,7 +201,7 @@ public class SubscriptionState {
 
         setSubscriptionType(SubscriptionType.AUTO_PATTERN);
 
-        this.listener = listener;
+        this.rebalanceListener = listener;
         this.subscribedPattern = pattern;
     }
 
@@ -270,26 +260,6 @@ public class SubscriptionState {
         return state;
     }
 
-    public void committed(TopicPartition tp, OffsetAndMetadata offset) {
-        assignedState(tp).committed(offset);
-    }
-
-    public OffsetAndMetadata committed(TopicPartition tp) {
-        return assignedState(tp).committed;
-    }
-
-    public void needRefreshCommits() {
-        this.needsFetchCommittedOffsets = true;
-    }
-
-    public boolean refreshCommitsNeeded() {
-        return this.needsFetchCommittedOffsets;
-    }
-
-    public void commitsRefreshed() {
-        this.needsFetchCommittedOffsets = false;
-    }
-
     public void seek(TopicPartition tp, long offset) {
         assignedState(tp).seek(offset);
     }
@@ -353,12 +323,18 @@ public class SubscriptionState {
         return allConsumed;
     }
 
-    public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
-        assignedState(partition).awaitReset(offsetResetStrategy);
+    public void requestOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
+        assignedState(partition).reset(offsetResetStrategy);
     }
 
-    public void needOffsetReset(TopicPartition partition) {
-        needOffsetReset(partition, defaultResetStrategy);
+    public void requestOffsetReset(TopicPartition partition) {
+        requestOffsetReset(partition, defaultResetStrategy);
+    }
+
+    public void setResetPending(Set<TopicPartition> partitions, long nextAllowResetTimeMs) {
+        for (TopicPartition partition : partitions) {
+            assignedState(partition).setResetPending(nextAllowResetTimeMs);
+        }
     }
 
     public boolean hasDefaultOffsetResetPolicy() {
@@ -373,26 +349,50 @@ public class SubscriptionState {
         return assignedState(partition).resetStrategy;
     }
 
-    public boolean hasAllFetchPositions(Collection<TopicPartition> partitions) {
-        for (TopicPartition partition : partitions)
-            if (!hasValidPosition(partition))
+    public boolean hasAllFetchPositions() {
+        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
+            if (!state.value().hasValidPosition())
                 return false;
+        }
         return true;
     }
 
-    public boolean hasAllFetchPositions() {
-        return hasAllFetchPositions(this.assignedPartitions());
-    }
-
     public Set<TopicPartition> missingFetchPositions() {
         Set<TopicPartition> missing = new HashSet<>();
         for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
-            if (!state.value().hasValidPosition())
+            if (state.value().isMissingPosition())
                 missing.add(state.topicPartition());
         }
         return missing;
     }
 
+    public void resetMissingPositions() {
+        final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
+        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
+            TopicPartition tp = state.topicPartition();
+            TopicPartitionState partitionState = state.value();
+            if (partitionState.isMissingPosition()) {
+                if (defaultResetStrategy == OffsetResetStrategy.NONE)
+                    partitionsWithNoOffsets.add(tp);
+                else
+                    partitionState.reset(defaultResetStrategy);
+            }
+        }
+
+        if (!partitionsWithNoOffsets.isEmpty())
+            throw new NoOffsetForPartitionException(partitionsWithNoOffsets);
+    }
+
+    public Set<TopicPartition> partitionsNeedingReset(long nowMs) {
+        Set<TopicPartition> partitions = new HashSet<>();
+        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
+            TopicPartitionState partitionState = state.value();
+            if (partitionState.awaitingReset() && partitionState.isResetAllowed(nowMs))
+                partitions.add(state.topicPartition());
+        }
+        return partitions;
+    }
+
     public boolean isAssigned(TopicPartition tp) {
         return assignment.contains(tp);
     }
@@ -417,12 +417,17 @@ public class SubscriptionState {
         assignedState(tp).resume();
     }
 
+    public void resetFailed(Set<TopicPartition> partitions, long nextRetryTimeMs) {
+        for (TopicPartition partition : partitions)
+            assignedState(partition).resetFailed(nextRetryTimeMs);
+    }
+
     public void movePartitionToEnd(TopicPartition tp) {
         assignment.moveToEnd(tp);
     }
 
-    public ConsumerRebalanceListener listener() {
-        return listener;
+    public ConsumerRebalanceListener rebalanceListener() {
+        return rebalanceListener;
     }
 
     public void addListener(Listener listener) {
@@ -446,36 +451,54 @@ public class SubscriptionState {
         private Long highWatermark; // the high watermark from last fetch
         private Long logStartOffset; // the log start offset
         private Long lastStableOffset;
-        private OffsetAndMetadata committed;  // last committed position
         private boolean paused;  // whether this partition has been paused by the user
         private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
+        private Long nextAllowedRetryTimeMs;
 
-        public TopicPartitionState() {
+        TopicPartitionState() {
             this.paused = false;
             this.position = null;
             this.highWatermark = null;
             this.logStartOffset = null;
             this.lastStableOffset = null;
-            this.committed = null;
             this.resetStrategy = null;
+            this.nextAllowedRetryTimeMs = null;
         }
 
-        private void awaitReset(OffsetResetStrategy strategy) {
+        private void reset(OffsetResetStrategy strategy) {
             this.resetStrategy = strategy;
             this.position = null;
+            this.nextAllowedRetryTimeMs = null;
+        }
+
+        private boolean isResetAllowed(long nowMs) {
+            return nextAllowedRetryTimeMs == null || nowMs >= nextAllowedRetryTimeMs;
         }
 
-        public boolean awaitingReset() {
+        private boolean awaitingReset() {
             return resetStrategy != null;
         }
 
-        public boolean hasValidPosition() {
+        private void setResetPending(long nextAllowedRetryTimeMs) {
+            this.nextAllowedRetryTimeMs = nextAllowedRetryTimeMs;
+        }
+
+        private void resetFailed(long nextAllowedRetryTimeMs) {
+            this.nextAllowedRetryTimeMs = nextAllowedRetryTimeMs;
+        }
+
+        private boolean hasValidPosition() {
             return position != null;
         }
 
+        private boolean isMissingPosition() {
+            return !hasValidPosition() && !awaitingReset();
+        }
+
         private void seek(long offset) {
             this.position = offset;
             this.resetStrategy = null;
+            this.nextAllowedRetryTimeMs = null;
         }
 
         private void position(long offset) {
@@ -484,10 +507,6 @@ public class SubscriptionState {
             this.position = offset;
         }
 
-        private void committed(OffsetAndMetadata offset) {
-            this.committed = offset;
-        }
-
         private void pause() {
             this.paused = true;
         }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 65255fe..ed45e3a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -358,6 +358,10 @@ public class MockClient implements KafkaClient {
         authenticationException.clear();
     }
 
+    public boolean hasPendingMetadataUpdates() {
+        return !metadataUpdates.isEmpty();
+    }
+
     public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
         metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, false));
     }
@@ -386,6 +390,10 @@ public class MockClient implements KafkaClient {
         return !requests.isEmpty();
     }
 
+    public boolean hasPendingResponses() {
+        return !responses.isEmpty();
+    }
+
     @Override
     public int inFlightRequestCount(String node) {
         int result = 0;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index be8db2b..8c147a5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -55,6 +55,7 @@ import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
@@ -118,6 +119,13 @@ public class KafkaConsumerTest {
     private final String topic3 = "test3";
     private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
 
+    private final int sessionTimeoutMs = 10000;
+    private final int heartbeatIntervalMs = 1000;
+
+    // Set auto commit interval lower than heartbeat so we don't need to deal with
+    // a concurrent heartbeat request
+    private final int autoCommitIntervalMs = 500;
+
     @Rule
     public ExpectedException expectedException = ExpectedException.none();
 
@@ -125,7 +133,7 @@ public class KafkaConsumerTest {
     public void testConstructorClose() throws Exception {
         Properties props = new Properties();
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar.local:9999");
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "invalid-23-8409-adsfsdj");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
@@ -192,49 +200,30 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnNullTopicCollection() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        List<String> nullList = null;
-
-        try {
-            consumer.subscribe(nullList);
-        } finally {
-            consumer.close();
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.subscribe((List<String>) null);
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnNullTopic() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        String nullTopic = null;
-
-        try {
-            consumer.subscribe(singletonList(nullTopic));
-        } finally {
-            consumer.close();
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.subscribe(singletonList((String) null));
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnEmptyTopic() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        String emptyTopic = "  ";
-
-        try {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            String emptyTopic = "  ";
             consumer.subscribe(singletonList(emptyTopic));
-        } finally {
-            consumer.close();
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnNullPattern() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        Pattern pattern = null;
-
-        try {
-            consumer.subscribe(pattern);
-        } finally {
-            consumer.close();
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.subscribe((Pattern) null);
         }
     }
 
@@ -243,64 +232,46 @@ public class KafkaConsumerTest {
         Properties props = new Properties();
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "");
-
-        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        try {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) {
             consumer.subscribe(singletonList(topic));
-        } finally {
-            consumer.close();
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testSeekNegative() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        try {
-            consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.assign(singleton(new TopicPartition("nonExistTopic", 0)));
             consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
-        } finally {
-            consumer.close();
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnNullTopicPartition() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        try {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
             consumer.assign(null);
-        } finally {
-            consumer.close();
         }
     }
 
     @Test
     public void testAssignOnEmptyTopicPartition() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-
-        consumer.assign(Collections.<TopicPartition>emptyList());
-        assertTrue(consumer.subscription().isEmpty());
-        assertTrue(consumer.assignment().isEmpty());
-
-        consumer.close();
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.assign(Collections.<TopicPartition>emptyList());
+            assertTrue(consumer.subscription().isEmpty());
+            assertTrue(consumer.assignment().isEmpty());
+        }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnNullTopicInPartition() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        try {
-            consumer.assign(Arrays.asList(new TopicPartition(null, 0)));
-        } finally {
-            consumer.close();
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.assign(singleton(new TopicPartition(null, 0)));
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnEmptyTopicInPartition() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        try {
-            consumer.assign(Arrays.asList(new TopicPartition("  ", 0)));
-        } finally {
-            consumer.close();
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.assign(singleton(new TopicPartition("  ", 0)));
         }
     }
 
@@ -312,7 +283,7 @@ public class KafkaConsumerTest {
             props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
             props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName());
 
-            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
+            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
                     props, new StringDeserializer(), new StringDeserializer());
             assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
             assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get());
@@ -354,17 +325,16 @@ public class KafkaConsumerTest {
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer");
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        return newConsumer(props);
+    }
 
+    private KafkaConsumer<byte[], byte[]> newConsumer(Properties props) {
         return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
     }
 
+
     @Test
     public void verifyHeartbeatSent() throws Exception {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 1000;
-        int autoCommitIntervalMs = 10000;
-
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
@@ -376,17 +346,15 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
-
-        consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
-        Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
+        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
+        Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
         // initial fetch
         client.prepareResponseFrom(fetchResponse(tp0, 0, 0), node);
 
         consumer.poll(0);
-        assertEquals(Collections.singleton(tp0), consumer.assignment());
+        assertEquals(singleton(tp0), consumer.assignment());
 
         AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator);
 
@@ -402,11 +370,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 1000;
-        int autoCommitIntervalMs = 10000;
-
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
@@ -418,11 +381,9 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
-
-        consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
-        Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
+        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
+        Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
         consumer.poll(0);
 
@@ -444,11 +405,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 3000;
-        int heartbeatIntervalMs = 2000;
-        int autoCommitIntervalMs = 1000;
-
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
@@ -460,34 +416,155 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
-        consumer.assign(Arrays.asList(tp0));
-        consumer.seekToBeginning(Arrays.asList(tp0));
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
+        consumer.assign(singleton(tp0));
+        consumer.seekToBeginning(singleton(tp0));
 
         // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
         // we just lookup the starting position and send the record fetch.
-        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L), Errors.NONE));
+        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L)));
         client.prepareResponse(fetchResponse(tp0, 50L, 5));
 
-        ConsumerRecords<String, String> records = consumer.poll(0);
+        ConsumerRecords<String, String> records = consumer.poll(5);
         assertEquals(5, records.count());
         assertEquals(55L, consumer.position(tp0));
         consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
+    public void testFetchProgressWithMissingPartitionPosition() {
+        // Verifies that we can make progress on one partition while we are awaiting
+        // a reset on another partition.
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster(topic, 2);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = createMetadata();
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, client, metadata);
+        consumer.assign(Arrays.asList(tp0, tp1));
+        consumer.seekToEnd(singleton(tp0));
+        consumer.seekToBeginning(singleton(tp1));
+
+        client.prepareResponse(
+                new MockClient.RequestMatcher() {
+                    @Override
+                    public boolean matches(AbstractRequest body) {
+                        ListOffsetRequest request = (ListOffsetRequest) body;
+                        Map<TopicPartition, Long> expectedTimestamps = new HashMap<>();
+                        expectedTimestamps.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP);
+                        expectedTimestamps.put(tp1, ListOffsetRequest.EARLIEST_TIMESTAMP);
+                        return expectedTimestamps.equals(request.partitionTimestamps());
+                    }
+                }, listOffsetsResponse(Collections.singletonMap(tp0, 50L),
+                        Collections.singletonMap(tp1, Errors.NOT_LEADER_FOR_PARTITION)));
+        client.prepareResponse(
+                new MockClient.RequestMatcher() {
+                    @Override
+                    public boolean matches(AbstractRequest body) {
+                        FetchRequest request = (FetchRequest) body;
+                        return request.fetchData().keySet().equals(singleton(tp0)) &&
+                                request.fetchData().get(tp0).fetchOffset == 50L;
+
+                    }
+                }, fetchResponse(tp0, 50L, 5));
+
+        ConsumerRecords<String, String> records = consumer.poll(5);
+        assertEquals(5, records.count());
+        assertEquals(singleton(tp0), records.partitions());
+    }
+
+    @Test(expected = NoOffsetForPartitionException.class)
+    public void testMissingOffsetNoResetPolicy() {
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = createMetadata();
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                OffsetResetStrategy.NONE, true);
+        consumer.assign(singletonList(tp0));
+
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+        // lookup committed offset and find nothing
+        client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator);
+        consumer.poll(0);
+    }
+
+    @Test
+    public void testResetToCommittedOffset() {
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = createMetadata();
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                OffsetResetStrategy.NONE, true);
+        consumer.assign(singletonList(tp0));
+
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+        client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L), Errors.NONE), coordinator);
+        consumer.poll(0);
+
+        assertEquals(539L, consumer.position(tp0));
+    }
+
+    @Test
+    public void testResetUsingAutoResetPolicy() {
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = createMetadata();
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                OffsetResetStrategy.LATEST, true);
+        consumer.assign(singletonList(tp0));
+
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+        client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator);
+        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L)));
+
+        consumer.poll(0);
+
+        assertEquals(50L, consumer.position(tp0));
+    }
+
+    @Test
     public void testCommitsFetchedDuringAssign() {
         long offset1 = 10000;
         long offset2 = 20000;
 
-        int rebalanceTimeoutMs = 6000;
-        int sessionTimeoutMs = 3000;
-        int heartbeatIntervalMs = 2000;
-        int autoCommitIntervalMs = 1000;
-
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Cluster cluster = TestUtils.singletonCluster(topic, 2);
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = createMetadata();
@@ -497,8 +574,7 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
         consumer.assign(singletonList(tp0));
 
         // lookup coordinator
@@ -506,10 +582,7 @@ public class KafkaConsumerTest {
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // fetch offset for one topic
-        client.prepareResponseFrom(
-                offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE),
-                coordinator);
-
+        client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE), coordinator);
         assertEquals(offset1, consumer.committed(tp0).offset());
 
         consumer.assign(Arrays.asList(tp0, tp1));
@@ -529,14 +602,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testAutoCommitSentBeforePositionUpdate() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-
-        // adjust auto commit interval lower than heartbeat so we don't need to deal with
-        // a concurrent heartbeat request
-        int autoCommitIntervalMs = 1000;
-
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
@@ -548,11 +613,9 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
-
-        consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
-        Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
+        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
+        Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
         consumer.poll(0);
 
@@ -575,13 +638,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testRegexSubscription() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-        int autoCommitIntervalMs = 1000;
-
         String unmatchedTopic = "unmatched";
-
         Time time = new MockTime();
 
         Map<String, Integer> topicMetadata = new HashMap<>();
@@ -596,10 +653,7 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
-
-
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
         prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
 
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
@@ -614,10 +668,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testChangingRegexSubscription() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-        int autoCommitIntervalMs = 1000;
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         String otherTopic = "other";
@@ -639,8 +689,7 @@ public class KafkaConsumerTest {
 
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
 
         Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
@@ -659,14 +708,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testWakeupWithFetchDataAvailable() throws Exception {
-        int rebalanceTimeoutMs = 60000;
-        final int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-
-        // adjust auto commit interval lower than heartbeat so we don't need to deal with
-        // a concurrent heartbeat request
-        int autoCommitIntervalMs = 1000;
-
         final Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
@@ -678,11 +719,9 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
-
-        consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
-        prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
+        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
+        prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
         consumer.poll(0);
 
@@ -719,10 +758,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-
         final Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         final Node node = cluster.nodes().get(0);
@@ -734,11 +769,9 @@ public class KafkaConsumerTest {
         client.setNode(node);
         final PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 0);
-
-        consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
-        prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
+        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
+        prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
         consumer.poll(0);
 
@@ -756,14 +789,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void fetchResponseWithUnexpectedPartitionIsIgnored() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-
-        // adjust auto commit interval lower than heartbeat so we don't need to deal with
-        // a concurrent heartbeat request
-        int autoCommitIntervalMs = 1000;
-
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1));
         Node node = cluster.nodes().get(0);
@@ -775,9 +800,7 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
-
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
         consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));
 
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -801,14 +824,6 @@ public class KafkaConsumerTest {
      */
     @Test
     public void testSubscriptionChangesWithAutoCommitEnabled() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-
-        // adjust auto commit interval lower than heartbeat so we don't need to deal with
-        // a concurrent heartbeat request
-        int autoCommitIntervalMs = 1000;
-
         Time time = new MockTime();
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
@@ -824,8 +839,7 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
 
         // initial subscription
         consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
@@ -925,11 +939,6 @@ public class KafkaConsumerTest {
      */
     @Test
     public void testSubscriptionChangesWithAutoCommitDisabled() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-        int autoCommitIntervalMs = 1000;
-
         Time time = new MockTime();
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
@@ -944,33 +953,32 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
 
         // initial subscription
-        consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
+        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
 
         // verify that subscription has changed but assignment is still unchanged
-        assertTrue(consumer.subscription().equals(Collections.singleton(topic)));
+        assertTrue(consumer.subscription().equals(singleton(topic)));
         assertTrue(consumer.assignment().isEmpty());
 
         // mock rebalance responses
-        prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+        prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
         consumer.poll(0);
 
         // verify that subscription is still the same, and now assignment has caught up
-        assertTrue(consumer.subscription().equals(Collections.singleton(topic)));
-        assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+        assertTrue(consumer.subscription().equals(singleton(topic)));
+        assertTrue(consumer.assignment().equals(singleton(tp0)));
 
         consumer.poll(0);
 
         // subscription change
-        consumer.subscribe(Arrays.asList(topic2), getConsumerRebalanceListener(consumer));
+        consumer.subscribe(singleton(topic2), getConsumerRebalanceListener(consumer));
 
         // verify that subscription has changed but assignment is still unchanged
-        assertTrue(consumer.subscription().equals(Collections.singleton(topic2)));
-        assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+        assertTrue(consumer.subscription().equals(singleton(topic2)));
+        assertTrue(consumer.assignment().equals(singleton(tp0)));
 
         // the auto commit is disabled, so no offset commit request should be sent
         for (ClientRequest req: client.requests())
@@ -993,11 +1001,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testManualAssignmentChangeWithAutoCommitEnabled() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-        int autoCommitIntervalMs = 1000;
-
         Time time = new MockTime();
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
@@ -1012,32 +1015,29 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
 
         // lookup coordinator
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
-        consumer.assign(Arrays.asList(tp0));
-        consumer.seekToBeginning(Arrays.asList(tp0));
+        consumer.assign(singleton(tp0));
+        consumer.seekToBeginning(singleton(tp0));
 
         // fetch offset for one topic
-        client.prepareResponseFrom(
-                offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE),
-                coordinator);
+        client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE), coordinator);
         assertEquals(0, consumer.committed(tp0).offset());
 
         // verify that assignment immediately changes
-        assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+        assertTrue(consumer.assignment().equals(singleton(tp0)));
 
         // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
         // we just lookup the starting position and send the record fetch.
-        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE));
+        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L)));
         client.prepareResponse(fetchResponse(tp0, 10L, 1));
 
-        ConsumerRecords<String, String> records = consumer.poll(0);
+        ConsumerRecords<String, String> records = consumer.poll(5);
         assertEquals(1, records.count());
         assertEquals(11L, consumer.position(tp0));
 
@@ -1045,10 +1045,10 @@ public class KafkaConsumerTest {
         AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 11);
 
         // new manual assignment
-        consumer.assign(Arrays.asList(t2p0));
+        consumer.assign(singleton(t2p0));
 
         // verify that assignment immediately changes
-        assertTrue(consumer.assignment().equals(Collections.singleton(t2p0)));
+        assertTrue(consumer.assignment().equals(singleton(t2p0)));
         // verify that the offset commits occurred as expected
         assertTrue(commitReceived.get());
 
@@ -1058,11 +1058,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testManualAssignmentChangeWithAutoCommitDisabled() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-        int autoCommitIntervalMs = 1000;
-
         Time time = new MockTime();
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
@@ -1077,16 +1072,15 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
 
         // lookup coordinator
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
-        consumer.assign(Arrays.asList(tp0));
-        consumer.seekToBeginning(Arrays.asList(tp0));
+        consumer.assign(singleton(tp0));
+        consumer.seekToBeginning(singleton(tp0));
 
         // fetch offset for one topic
         client.prepareResponseFrom(
@@ -1095,22 +1089,22 @@ public class KafkaConsumerTest {
         assertEquals(0, consumer.committed(tp0).offset());
 
         // verify that assignment immediately changes
-        assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+        assertTrue(consumer.assignment().equals(singleton(tp0)));
 
         // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
         // we just lookup the starting position and send the record fetch.
-        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE));
+        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L)));
         client.prepareResponse(fetchResponse(tp0, 10L, 1));
 
-        ConsumerRecords<String, String> records = consumer.poll(0);
+        ConsumerRecords<String, String> records = consumer.poll(5);
         assertEquals(1, records.count());
         assertEquals(11L, consumer.position(tp0));
 
         // new manual assignment
-        consumer.assign(Arrays.asList(t2p0));
+        consumer.assign(singleton(t2p0));
 
         // verify that assignment immediately changes
-        assertTrue(consumer.assignment().equals(Collections.singleton(t2p0)));
+        assertTrue(consumer.assignment().equals(singleton(t2p0)));
 
         // the auto commit is disabled, so no offset commit request should be sent
         for (ClientRequest req : client.requests())
@@ -1122,11 +1116,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testOffsetOfPausedPartitions() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-        int autoCommitIntervalMs = 1000;
-
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 2);
         Node node = cluster.nodes().get(0);
@@ -1138,8 +1127,7 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
 
         // lookup coordinator
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -1171,7 +1159,7 @@ public class KafkaConsumerTest {
         final Map<TopicPartition, Long> offsetResponse = new HashMap<>();
         offsetResponse.put(tp0, 3L);
         offsetResponse.put(tp1, 3L);
-        client.prepareResponse(listOffsetsResponse(offsetResponse, Errors.NONE));
+        client.prepareResponse(listOffsetsResponse(offsetResponse));
         assertEquals(3L, consumer.position(tp0));
         assertEquals(3L, consumer.position(tp1));
 
@@ -1182,11 +1170,8 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithNoSubscription() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        try {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
             consumer.poll(0);
-        } finally {
-            consumer.close();
         }
     }
 
@@ -1225,7 +1210,7 @@ public class KafkaConsumerTest {
         Map<TopicPartition, Errors> response = new HashMap<>();
         response.put(tp0, Errors.NONE);
         OffsetCommitResponse commitResponse = offsetCommitResponse(response);
-        consumerCloseTest(5000, Arrays.asList(commitResponse), 5000, false);
+        consumerCloseTest(5000, singletonList(commitResponse), 5000, false);
     }
 
     @Test
@@ -1262,10 +1247,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void shouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 500;
-
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
@@ -1277,16 +1258,14 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                                                                   rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 1000);
-
-        consumer.subscribe(Collections.singleton(topic), getConsumerRebalanceListener(consumer));
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
+        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
 
         client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
-        client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE), coordinator);
+        client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator);
 
         client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);
         client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node);
@@ -1302,22 +1281,22 @@ public class KafkaConsumerTest {
         }, new HeartbeatResponse(Errors.REBALANCE_IN_PROGRESS), coordinator);
 
         // join group
-        final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(Collections.singletonList(topic)));
+        final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(singletonList(topic)));
 
         // This member becomes the leader
         final JoinGroupResponse leaderResponse = new JoinGroupResponse(Errors.NONE, 1, assignor.name(), "memberId", "memberId",
-                                                                 Collections.<String, ByteBuffer>singletonMap("memberId", byteBuffer));
+                Collections.singletonMap("memberId", byteBuffer));
         client.prepareResponseFrom(leaderResponse, coordinator);
 
         // sync group fails due to disconnect
-        client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE), coordinator, true);
+        client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator, true);
 
         // should try and find the new coordinator
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
 
         // rejoin group
         client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
-        client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE), coordinator);
+        client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator);
 
         client.prepareResponseFrom(new MockClient.RequestMatcher() {
             @Override
@@ -1333,12 +1312,9 @@ public class KafkaConsumerTest {
     }
 
     private void consumerCloseTest(final long closeTimeoutMs,
-            List<? extends AbstractResponse> responses,
-            long waitMs,
-            boolean interrupt) throws Exception {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 5000;
+                                   List<? extends AbstractResponse> responses,
+                                   long waitMs,
+                                   boolean interrupt) throws Exception {
 
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
@@ -1351,11 +1327,9 @@ public class KafkaConsumerTest {
         client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 1000);
-
-        consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
-        Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
+        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
+        Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
         client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
@@ -1434,11 +1408,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testConsumerWithinBlackoutPeriodAfterAuthenticationFailure() {
-        int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
-        int autoCommitIntervalMs = 1000;
-
         Time time = new MockTime();
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
@@ -1453,9 +1422,7 @@ public class KafkaConsumerTest {
         client.authenticationFailed(node, 300);
         PartitionAssignor assignor = new RangeAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
-
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
         consumer.subscribe(Collections.singleton(topic));
         callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
 
@@ -1634,15 +1601,28 @@ public class KafkaConsumerTest {
         return new OffsetFetchResponse(Errors.NONE, partitionData);
     }
 
-    private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets, Errors error) {
+    private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets) {
+        return listOffsetsResponse(offsets, Collections.<TopicPartition, Errors>emptyMap());
+    }
+
+    private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> partitionOffsets,
+                                                   Map<TopicPartition, Errors> partitionErrors) {
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
-        for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
-            partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error,
-                    1L, partitionOffset.getValue()));
+        for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) {
+            partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(Errors.NONE,
+                    ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionOffset.getValue()));
         }
+
+        for (Map.Entry<TopicPartition, Errors> partitionError : partitionErrors.entrySet()) {
+            partitionData.put(partitionError.getKey(), new ListOffsetResponse.PartitionData(
+                    partitionError.getValue(), ListOffsetResponse.UNKNOWN_TIMESTAMP,
+                    ListOffsetResponse.UNKNOWN_OFFSET));
+        }
+
         return new ListOffsetResponse(partitionData);
     }
 
+
     private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
         LinkedHashMap<TopicPartition, PartitionData> tpResponses = new LinkedHashMap<>();
         for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
@@ -1674,13 +1654,22 @@ public class KafkaConsumerTest {
                                                       KafkaClient client,
                                                       Metadata metadata,
                                                       PartitionAssignor assignor,
-                                                      int rebalanceTimeoutMs,
-                                                      int sessionTimeoutMs,
-                                                      int heartbeatIntervalMs,
-                                                      boolean autoCommitEnabled,
-                                                      int autoCommitIntervalMs) {
-        // create a consumer with mocked time and mocked network
+                                                      boolean autoCommitEnabled) {
+        return newConsumer(time, client, metadata, assignor, OffsetResetStrategy.EARLIEST, autoCommitEnabled);
+    }
+
+    private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time,
+                                                                  KafkaClient client,
+                                                                  Metadata metadata) {
+        return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.EARLIEST, false);
+    }
 
+    private KafkaConsumer<String, String> newConsumer(Time time,
+                                                      KafkaClient client,
+                                                      Metadata metadata,
+                                                      PartitionAssignor assignor,
+                                                      OffsetResetStrategy resetStrategy,
+                                                      boolean autoCommitEnabled) {
         String clientId = "mock-consumer";
         String groupId = "mock-group";
         String metricGroupPrefix = "consumer";
@@ -1693,18 +1682,18 @@ public class KafkaConsumerTest {
         int fetchSize = 1024 * 1024;
         int maxPollRecords = Integer.MAX_VALUE;
         boolean checkCrcs = true;
-
+        int rebalanceTimeoutMs = 60000;
+        
         Deserializer<String> keyDeserializer = new StringDeserializer();
         Deserializer<String> valueDeserializer = new StringDeserializer();
 
-        OffsetResetStrategy autoResetStrategy = OffsetResetStrategy.EARLIEST;
-        List<PartitionAssignor> assignors = Arrays.asList(assignor);
+        List<PartitionAssignor> assignors = singletonList(assignor);
         ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.<ConsumerInterceptor<String, String>>emptyList());
 
         Metrics metrics = new Metrics();
         ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
 
-        SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy);
+        SubscriptionState subscriptions = new SubscriptionState(resetStrategy);
         LogContext loggerFactory = new LogContext();
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time,
                 retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
@@ -1745,6 +1734,7 @@ public class KafkaConsumerTest {
                 metricsRegistry.fetcherMetrics,
                 time,
                 retryBackoffMs,
+                requestTimeoutMs,
                 IsolationLevel.READ_UNCOMMITTED);
 
         return new KafkaConsumer<>(
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 6ed46f7..67f8e8d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 
@@ -30,11 +30,11 @@ import static org.junit.Assert.assertFalse;
 
 public class MockConsumerTest {
     
-    private MockConsumer<String, String> consumer = new MockConsumer<String, String>(OffsetResetStrategy.EARLIEST);
+    private MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
 
     @Test
     public void testSimpleMock() {
-        consumer.subscribe(Arrays.asList("test"), new NoOpConsumerRebalanceListener());
+        consumer.subscribe(Collections.singleton("test"));
         assertEquals(0, consumer.poll(1000).count());
         consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
         // Mock consumers need to seek manually since they cannot automatically reset offsets
@@ -43,8 +43,8 @@ public class MockConsumerTest {
         beginningOffsets.put(new TopicPartition("test", 1), 0L);
         consumer.updateBeginningOffsets(beginningOffsets);
         consumer.seek(new TopicPartition("test", 0), 0);
-        ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
-        ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
+        ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
+        ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
         consumer.addRecord(rec1);
         consumer.addRecord(rec2);
         ConsumerRecords<String, String> recs = consumer.poll(1);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index fdaa6b3..2e5c960 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1007,8 +1007,6 @@ public class ConsumerCoordinatorTest {
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
-
-        assertEquals(100L, subscriptions.committed(t1p).offset());
     }
 
     @Test
@@ -1047,7 +1045,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                                                           ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -1063,8 +1061,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         time.sleep(autoCommitIntervalMs);
         coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
-
-        assertEquals(100L, subscriptions.committed(t1p).offset());
+        assertFalse(client.hasPendingResponses());
     }
 
     @Test
@@ -1072,7 +1069,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                                                           ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -1092,14 +1089,13 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         time.sleep(autoCommitIntervalMs);
         coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
-
-        assertEquals(100L, subscriptions.committed(t1p).offset());
+        assertFalse(client.hasPendingResponses());
     }
 
     @Test
     public void testAutoCommitManualAssignment() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                                                           ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.seek(t1p, 100);
@@ -1110,14 +1106,13 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         time.sleep(autoCommitIntervalMs);
         coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
-
-        assertEquals(100L, subscriptions.committed(t1p).offset());
+        assertFalse(client.hasPendingResponses());
     }
 
     @Test
     public void testAutoCommitManualAssignmentCoordinatorUnknown() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                                                           ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.seek(t1p, 100);
@@ -1127,8 +1122,6 @@ public class ConsumerCoordinatorTest {
         time.sleep(autoCommitIntervalMs);
         consumerClient.poll(0);
 
-        assertNull(subscriptions.committed(t1p));
-
         // now find the coordinator
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
@@ -1137,8 +1130,7 @@ public class ConsumerCoordinatorTest {
         time.sleep(retryBackoffMs);
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
-
-        assertEquals(100L, subscriptions.committed(t1p).offset());
+        assertFalse(client.hasPendingResponses());
     }
 
     @Test
@@ -1151,12 +1143,11 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
 
         AtomicBoolean success = new AtomicBoolean(false);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "hello")), callback(success));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "hello"));
+        coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
-
-        assertEquals(100L, subscriptions.committed(t1p).offset());
-        assertEquals("hello", subscriptions.committed(t1p).metadata());
     }
 
     @Test
@@ -1428,11 +1419,12 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
-        subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
-        assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(100L, subscriptions.committed(t1p).offset());
+
+        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertTrue(subscriptions.hasAllFetchPositions());
+        assertEquals(100L, subscriptions.position(t1p).longValue());
     }
 
     @Test
@@ -1441,12 +1433,13 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
-        subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
-        assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(100L, subscriptions.committed(t1p).offset());
+
+        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertTrue(subscriptions.hasAllFetchPositions());
+        assertEquals(100L, subscriptions.position(t1p).longValue());
     }
 
     @Test
@@ -1455,7 +1448,6 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
-        subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED));
         try {
             coordinator.refreshCommittedOffsetsIfNeeded();
@@ -1471,7 +1463,6 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
-        subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
     }
@@ -1482,13 +1473,14 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
-        subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
-        assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(100L, subscriptions.committed(t1p).offset());
+
+        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertTrue(subscriptions.hasAllFetchPositions());
+        assertEquals(100L, subscriptions.position(t1p).longValue());
     }
 
     @Test
@@ -1497,11 +1489,42 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
-        subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", -1L));
         coordinator.refreshCommittedOffsetsIfNeeded();
-        assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(null, subscriptions.committed(t1p));
+
+        assertEquals(Collections.singleton(t1p), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.partitionsNeedingReset(time.milliseconds()));
+        assertFalse(subscriptions.hasAllFetchPositions());
+        assertEquals(null, subscriptions.position(t1p));
+    }
+
+    @Test
+    public void testNoCoordinatorDiscoveryIfPositionsKnown() {
+        assertTrue(coordinator.coordinatorUnknown());
+
+        subscriptions.assignFromUser(singleton(t1p));
+        subscriptions.seek(t1p, 500L);
+        coordinator.refreshCommittedOffsetsIfNeeded();
+
+        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertTrue(subscriptions.hasAllFetchPositions());
+        assertEquals(500L, subscriptions.position(t1p).longValue());
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testNoCoordinatorDiscoveryIfPartitionAwaitingReset() {
+        assertTrue(coordinator.coordinatorUnknown());
+
+        subscriptions.assignFromUser(singleton(t1p));
+        subscriptions.requestOffsetReset(t1p, OffsetResetStrategy.EARLIEST);
+        coordinator.refreshCommittedOffsetsIfNeeded();
+
+        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertFalse(subscriptions.hasAllFetchPositions());
+        assertEquals(Collections.singleton(t1p), subscriptions.partitionsNeedingReset(time.milliseconds()));
+        assertEquals(OffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(t1p));
+        assertTrue(coordinator.coordinatorUnknown());
     }
 
     @Test
@@ -1664,7 +1687,7 @@ public class ConsumerCoordinatorTest {
         time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen
         coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
         assertFalse(coordinator.coordinatorUnknown());
-        assertEquals(subscriptions.committed(t1p).offset(), 100L);
+        assertEquals(100L, subscriptions.position(t1p).longValue());
     }
 
     private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
@@ -1839,6 +1862,18 @@ public class ConsumerCoordinatorTest {
         };
     }
 
+    private OffsetCommitCallback callback(final Map<TopicPartition, OffsetAndMetadata> expectedOffsets,
+                                          final AtomicBoolean success) {
+        return new OffsetCommitCallback() {
+            @Override
+            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+                if (expectedOffsets.equals(offsets) && exception == null)
+                    success.set(true);
+            }
+        };
+    }
+
+
     private static class MockCommitCallback implements OffsetCommitCallback {
         public int invoked = 0;
         public Exception exception = null;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index de621b4..e8bb4e6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -25,8 +25,6 @@ import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -124,6 +122,7 @@ public class FetcherTest {
     private int maxWaitMs = 0;
     private int fetchSize = 1000;
     private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 30000;
     private MockTime time = new MockTime(1);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
     private MockClient client = new MockClient(time, metadata);
@@ -899,27 +898,12 @@ public class FetcherTest {
     }
 
     @Test
-    public void testUpdateFetchPositionsNoneCommittedNoResetStrategy() {
-        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp0, tp1));
-        subscriptionsNoAutoReset.assignFromUser(tps);
-        try {
-            fetcherNoAutoReset.updateFetchPositions(tps);
-            fail("Should have thrown NoOffsetForPartitionException");
-        } catch (NoOffsetForPartitionException e) {
-            // we expect the exception to be thrown for both TPs at the same time
-            Set<TopicPartition> partitions = e.partitions();
-            assertEquals(tps, partitions);
-        }
-    }
-
-    @Test
-    public void testUpdateFetchPositionToCommitted() {
-        // unless a specific reset is expected, the default behavior is to reset to the committed
-        // position if one is present
+    public void testUpdateFetchPositionNoOpWithPositionSet() {
         subscriptions.assignFromUser(singleton(tp0));
-        subscriptions.committed(tp0, new OffsetAndMetadata(5));
+        subscriptions.seek(tp0, 5L);
 
-        fetcher.updateFetchPositions(singleton(tp0));
+        fetcher.resetOffsetsIfNeeded();
+        assertFalse(client.hasInFlightRequests());
         assertTrue(subscriptions.isFetchable(tp0));
         assertEquals(5, subscriptions.position(tp0).longValue());
     }
@@ -927,11 +911,12 @@ public class FetcherTest {
     @Test
     public void testUpdateFetchPositionResetToDefaultOffset() {
         subscriptions.assignFromUser(singleton(tp0));
-        // with no commit position, we should reset using the default strategy defined above (EARLIEST)
+        subscriptions.requestOffsetReset(tp0);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp0));
+                listOffsetResponse(Errors.NONE, 1L, 5L));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
         assertTrue(subscriptions.isFetchable(tp0));
         assertEquals(5, subscriptions.position(tp0).longValue());
@@ -940,11 +925,12 @@ public class FetcherTest {
     @Test
     public void testUpdateFetchPositionResetToLatestOffset() {
         subscriptions.assignFromUser(singleton(tp0));
-        subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST);
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp0));
+                listOffsetResponse(Errors.NONE, 1L, 5L));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
         assertTrue(subscriptions.isFetchable(tp0));
         assertEquals(5, subscriptions.position(tp0).longValue());
@@ -957,7 +943,7 @@ public class FetcherTest {
                     new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);
 
             subscriptions.assignFromUser(singleton(tp0));
-            subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST);
+            subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
             client.prepareResponse(new MockClient.RequestMatcher() {
                 @Override
@@ -966,7 +952,9 @@ public class FetcherTest {
                     return request.isolationLevel() == isolationLevel;
                 }
             }, listOffsetResponse(Errors.NONE, 1L, 5L));
-            fetcher.updateFetchPositions(singleton(tp0));
+            fetcher.resetOffsetsIfNeeded();
+            consumerClient.pollNoWakeup();
+
             assertFalse(subscriptions.isOffsetResetNeeded(tp0));
             assertTrue(subscriptions.isFetchable(tp0));
             assertEquals(5, subscriptions.position(tp0).longValue());
@@ -976,11 +964,42 @@ public class FetcherTest {
     @Test
     public void testUpdateFetchPositionResetToEarliestOffset() {
         subscriptions.assignFromUser(singleton(tp0));
-        subscriptions.needOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp0));
+                listOffsetResponse(Errors.NONE, 1L, 5L));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(5, subscriptions.position(tp0).longValue());
+    }
+
+    @Test
+    public void testResetOffsetsMetadataRefresh() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        // First fetch fails with stale metadata
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+                listOffsetResponse(Errors.NOT_LEADER_FOR_PARTITION, 1L, 5L), false);
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(subscriptions.hasValidPosition(tp0));
+
+        // Expect a metadata refresh
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        consumerClient.pollNoWakeup();
+        assertFalse(client.hasPendingMetadataUpdates());
+
+        // Next fetch succeeds
+        time.sleep(retryBackoffMs);
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+                listOffsetResponse(Errors.NONE, 1L, 5L));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
         assertTrue(subscriptions.isFetchable(tp0));
         assertEquals(5, subscriptions.position(tp0).longValue());
@@ -989,76 +1008,205 @@ public class FetcherTest {
     @Test
     public void testUpdateFetchPositionDisconnect() {
         subscriptions.assignFromUser(singleton(tp0));
-        subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST);
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
         // First request gets a disconnect
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 1L, 5L), true);
+                listOffsetResponse(Errors.NONE, 1L, 5L), true);
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(subscriptions.hasValidPosition(tp0));
+
+        // Expect a metadata refresh
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        consumerClient.pollNoWakeup();
+        assertFalse(client.hasPendingMetadataUpdates());
+
+        // No retry until the backoff passes
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(subscriptions.hasValidPosition(tp0));
 
         // Next one succeeds
+        time.sleep(retryBackoffMs);
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp0));
+                listOffsetResponse(Errors.NONE, 1L, 5L));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
         assertTrue(subscriptions.isFetchable(tp0));
         assertEquals(5, subscriptions.position(tp0).longValue());
     }
 
     @Test
-    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
+    public void testAssignmentChangeWithInFlightReset() {
         subscriptions.assignFromUser(singleton(tp0));
-        subscriptions.committed(tp0, new OffsetAndMetadata(0));
-        subscriptions.pause(tp0); // paused partition does not have a valid position
-        subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST);
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        // Send the ListOffsets request to reset the position
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(subscriptions.hasValidPosition(tp0));
+        assertTrue(client.hasInFlightRequests());
+
+        // Now we have an assignment change
+        subscriptions.assignFromUser(singleton(tp1));
+
+        // The response returns and is discarded
+        client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
+        consumerClient.pollNoWakeup();
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(subscriptions.isAssigned(tp0));
+    }
+
+    @Test
+    public void testSeekWithInFlightReset() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        // Send the ListOffsets request to reset the position
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(subscriptions.hasValidPosition(tp0));
+        assertTrue(client.hasInFlightRequests());
+
+        // Now we get a seek from the user
+        subscriptions.seek(tp0, 237);
+
+        // The response returns and is discarded
+        client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
+        consumerClient.pollNoWakeup();
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertEquals(237L, subscriptions.position(tp0).longValue());
+    }
+
+    @Test
+    public void testChangeResetWithInFlightReset() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        // Send the ListOffsets request to reset the position
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(subscriptions.hasValidPosition(tp0));
+        assertTrue(client.hasInFlightRequests());
+
+        // Now we get a seek from the user
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
 
+        // The response returns and is discarded
+        client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
+        consumerClient.pollNoWakeup();
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+        assertEquals(OffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(tp0));
+    }
+
+    @Test
+    public void testIdempotentResetWithInFlightReset() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        // Send the ListOffsets request to reset the position
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(subscriptions.hasValidPosition(tp0));
+        assertTrue(client.hasInFlightRequests());
+
+        // Now we get a seek from the user
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
+        consumerClient.pollNoWakeup();
+
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertEquals(5L, subscriptions.position(tp0).longValue());
+    }
+
+    @Test
+    public void testRestOffsetsAuthorizationFailure() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        // First request gets a disconnect
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+                listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1, -1), false);
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(subscriptions.hasValidPosition(tp0));
+
+        try {
+            fetcher.resetOffsetsIfNeeded();
+            fail("Expected authorization error to be raised");
+        } catch (TopicAuthorizationException e) {
+            assertEquals(singleton(tp0.topic()), e.unauthorizedTopics());
+        }
+
+        // The exception should clear after being raised, but no retry until the backoff
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(subscriptions.hasValidPosition(tp0));
+
+        // Next one succeeds
+        time.sleep(retryBackoffMs);
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 1L, 10L));
-        fetcher.updateFetchPositions(singleton(tp0));
+                listOffsetResponse(Errors.NONE, 1L, 5L));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
 
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
-        assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused
-        assertTrue(subscriptions.hasValidPosition(tp0));
-        assertEquals(10, subscriptions.position(tp0).longValue());
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(5, subscriptions.position(tp0).longValue());
     }
 
     @Test
-    public void testUpdateFetchPositionOfPausedPartitionsWithoutACommittedOffset() {
+    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
         subscriptions.assignFromUser(singleton(tp0));
         subscriptions.pause(tp0); // paused partition does not have a valid position
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
-        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 1L, 0L));
-        fetcher.updateFetchPositions(singleton(tp0));
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+                listOffsetResponse(Errors.NONE, 1L, 10L));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
 
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
         assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused
         assertTrue(subscriptions.hasValidPosition(tp0));
-        assertEquals(0, subscriptions.position(tp0).longValue());
+        assertEquals(10, subscriptions.position(tp0).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
         subscriptions.assignFromUser(singleton(tp0));
-        subscriptions.committed(tp0, new OffsetAndMetadata(0));
+        subscriptions.requestOffsetReset(tp0);
         subscriptions.pause(tp0); // paused partition does not have a valid position
-        subscriptions.seek(tp0, 10);
 
-        fetcher.updateFetchPositions(singleton(tp0));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
 
-        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isOffsetResetNeeded(tp0));
         assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused
-        assertTrue(subscriptions.hasValidPosition(tp0));
-        assertEquals(10, subscriptions.position(tp0).longValue());
+        assertFalse(subscriptions.hasValidPosition(tp0));
     }
 
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
         subscriptions.assignFromUser(singleton(tp0));
-        subscriptions.committed(tp0, new OffsetAndMetadata(0));
         subscriptions.seek(tp0, 10);
         subscriptions.pause(tp0); // paused partition already has a valid position
 
-        fetcher.updateFetchPositions(singleton(tp0));
+        fetcher.resetOffsetsIfNeeded();
 
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
         assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused
@@ -1480,7 +1628,7 @@ public class FetcherTest {
     @Test
     public void testGetOffsetsForTimesTimeout() {
         try {
-            fetcher.getOffsetsByTimes(Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), 100L);
+            fetcher.offsetsByTimes(Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), 100L);
             fail("Should throw timeout exception.");
         } catch (TimeoutException e) {
             // let it go.
@@ -1490,7 +1638,7 @@ public class FetcherTest {
     @Test
     public void testGetOffsetsForTimes() {
         // Empty map
-        assertTrue(fetcher.getOffsetsByTimes(new HashMap<TopicPartition, Long>(), 100L).isEmpty());
+        assertTrue(fetcher.offsetsByTimes(new HashMap<TopicPartition, Long>(), 100L).isEmpty());
         // Unknown Offset
         testGetOffsetsForTimesWithUnknownOffset();
         // Error code none with unknown offset
@@ -1521,7 +1669,7 @@ public class FetcherTest {
         offsetsToSearch.put(tp0, ListOffsetRequest.EARLIEST_TIMESTAMP);
         offsetsToSearch.put(tp1, ListOffsetRequest.EARLIEST_TIMESTAMP);
 
-        fetcher.getOffsetsByTimes(offsetsToSearch, 0);
+        fetcher.offsetsByTimes(offsetsToSearch, 0);
     }
 
     @Test
@@ -2098,7 +2246,7 @@ public class FetcherTest {
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
         timestampToSearch.put(t2p0, 0L);
         timestampToSearch.put(tp1, 0L);
-        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
+        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.offsetsByTimes(timestampToSearch, Long.MAX_VALUE);
 
         if (expectedOffsetForP0 == null)
             assertNull(offsetAndTimestampMap.get(t2p0));
@@ -2129,7 +2277,7 @@ public class FetcherTest {
 
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
         timestampToSearch.put(tp0, 0L);
-        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
+        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.offsetsByTimes(timestampToSearch, Long.MAX_VALUE);
 
         assertTrue(offsetAndTimestampMap.containsKey(tp0));
         assertNull(offsetAndTimestampMap.get(tp0));
@@ -2246,6 +2394,7 @@ public class FetcherTest {
                 metricsRegistry,
                 time,
                 retryBackoffMs,
+                requestTimeoutMs,
                 isolationLevel);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index c0a2df9..24255e8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
@@ -51,11 +50,9 @@ public class SubscriptionStateTest {
         state.assignFromUser(singleton(tp0));
         assertEquals(singleton(tp0), state.assignedPartitions());
         assertFalse(state.hasAllFetchPositions());
-        assertTrue(state.refreshCommitsNeeded());
-        state.committed(tp0, new OffsetAndMetadata(1));
         state.seek(tp0, 1);
         assertTrue(state.isFetchable(tp0));
-        assertAllPositions(tp0, 1L);
+        assertEquals(1L, state.position(tp0).longValue());
         state.assignFromUser(Collections.<TopicPartition>emptySet());
         assertTrue(state.assignedPartitions().isEmpty());
         assertFalse(state.isAssigned(tp0));
@@ -156,7 +153,7 @@ public class SubscriptionStateTest {
         state.assignFromUser(singleton(tp0));
         state.seek(tp0, 5);
         assertEquals(5L, (long) state.position(tp0));
-        state.needOffsetReset(tp0);
+        state.requestOffsetReset(tp0);
         assertFalse(state.isFetchable(tp0));
         assertTrue(state.isOffsetResetNeeded(tp0));
         assertEquals(null, state.position(tp0));
@@ -175,8 +172,7 @@ public class SubscriptionStateTest {
         assertTrue(state.partitionsAutoAssigned());
         state.assignFromSubscribed(singleton(tp0));
         state.seek(tp0, 1);
-        state.committed(tp0, new OffsetAndMetadata(1));
-        assertAllPositions(tp0, 1L);
+        assertEquals(1L, state.position(tp0).longValue());
         state.assignFromSubscribed(singleton(tp1));
         assertTrue(state.isAssigned(tp1));
         assertFalse(state.isAssigned(tp0));
@@ -195,15 +191,6 @@ public class SubscriptionStateTest {
         assertTrue(state.isFetchable(tp0));
     }
 
-    @Test
-    public void commitOffsetMetadata() {
-        state.assignFromUser(singleton(tp0));
-        state.committed(tp0, new OffsetAndMetadata(5, "hi"));
-
-        assertEquals(5, state.committed(tp0).offset());
-        assertEquals("hi", state.committed(tp0).metadata());
-    }
-
     @Test(expected = IllegalStateException.class)
     public void invalidPositionUpdate() {
         state.subscribe(singleton(topic), rebalanceListener);
@@ -295,11 +282,6 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
     }
 
-    private void assertAllPositions(TopicPartition tp, Long offset) {
-        assertEquals(offset.longValue(), state.committed(tp).offset());
-        assertEquals(offset, state.position(tp));
-    }
-
     private static class MockRebalanceListener implements ConsumerRebalanceListener {
         public Collection<TopicPartition> revoked;
         public Collection<TopicPartition> assigned;
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 4cdc989..ee38762 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -827,15 +827,19 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       keyDeserializer = new StringDeserializer,
       valueDeserializer = new StringDeserializer)
     consumer.subscribe(Collections.singleton(topic))
-    if (autoOffsetReset == "latest") {
-      do {
-        consumer.poll(1)
-      } while (consumer.assignment.isEmpty)
-    }
+    if (autoOffsetReset == "latest")
+      awaitInitialPositions(consumer)
     consumers += consumer
     consumer
   }
 
+  private def awaitInitialPositions(consumer: KafkaConsumer[_, _]): Unit = {
+    do {
+      consumer.poll(1)
+    } while (consumer.assignment.isEmpty)
+    consumer.assignment.asScala.foreach(tp => consumer.position(tp))
+  }
+
   private def clientProps(securityProtocol: SecurityProtocol, saslMechanism: String): Properties = {
     val props = new Properties
     props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
@@ -875,9 +879,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       valueDeserializer = new StringDeserializer,
       props = Some(clientProps(securityProtocol, saslMechanism)))
     consumer.subscribe(Collections.singleton(topic))
-    do {
-      consumer.poll(1)
-    } while (consumer.assignment.isEmpty)
+    awaitInitialPositions(consumer)
     consumers += consumer
     consumer
   }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message