kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [4/4] kafka git commit: KAFKA-3888: send consumer heartbeats from a background thread (KIP-62)
Date Wed, 17 Aug 2016 18:50:11 GMT
KAFKA-3888: send consumer heartbeats from a background thread (KIP-62)

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>

Closes #1627 from hachikuji/KAFKA-3888


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/40b1dd3f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/40b1dd3f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/40b1dd3f

Branch: refs/heads/trunk
Commit: 40b1dd3f495a59abef8a0cba5450526994c92c04
Parents: 19997ed
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Aug 17 11:50:04 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Aug 17 11:50:04 2016 -0700

----------------------------------------------------------------------
 .../clients/consumer/CommitFailedException.java |   9 +-
 .../kafka/clients/consumer/ConsumerConfig.java  |  39 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |  78 ++-
 .../consumer/internals/AbstractCoordinator.java | 532 ++++++++++++-------
 .../consumer/internals/ConsumerCoordinator.java | 264 +++++----
 .../internals/ConsumerNetworkClient.java        | 267 ++++++----
 .../clients/consumer/internals/DelayedTask.java |  24 -
 .../consumer/internals/DelayedTaskQueue.java    |  96 ----
 .../clients/consumer/internals/Fetcher.java     |  21 +-
 .../clients/consumer/internals/Heartbeat.java   |  56 +-
 .../consumer/internals/RequestFuture.java       |   2 +-
 .../apache/kafka/common/protocol/Protocol.java  |  25 +-
 .../kafka/common/requests/JoinGroupRequest.java |  49 +-
 .../common/requests/JoinGroupResponse.java      |  15 +-
 .../common/requests/OffsetFetchResponse.java    |   2 -
 .../clients/consumer/KafkaConsumerTest.java     |  49 +-
 .../internals/AbstractCoordinatorTest.java      |  68 ++-
 .../internals/ConsumerCoordinatorTest.java      | 179 ++++---
 .../internals/ConsumerNetworkClientTest.java    |  26 +-
 .../internals/DelayedTaskQueueTest.java         |  89 ----
 .../clients/consumer/internals/FetcherTest.java |   1 +
 .../consumer/internals/HeartbeatTest.java       |   6 +-
 .../common/requests/RequestResponseTest.java    |  14 +-
 .../runtime/distributed/DistributedConfig.java  |  29 +-
 .../runtime/distributed/WorkerCoordinator.java  |  38 +-
 .../runtime/distributed/WorkerGroupMember.java  |  15 +-
 .../distributed/WorkerCoordinatorTest.java      |   2 +
 core/src/main/scala/kafka/api/ApiVersion.scala  |  11 +-
 .../kafka/coordinator/GroupCoordinator.scala    |  90 ++--
 .../scala/kafka/coordinator/GroupMetadata.scala |   4 +-
 .../coordinator/GroupMetadataManager.scala      | 150 ++++--
 .../kafka/coordinator/MemberMetadata.scala      |   1 +
 .../src/main/scala/kafka/server/KafkaApis.scala |   6 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |   2 +-
 .../kafka/api/BaseConsumerTest.scala            | 170 +-----
 .../kafka/api/ConsumerBounceTest.scala          |  13 +-
 .../kafka/api/PlaintextConsumerTest.scala       | 223 ++++++--
 .../SaslPlainSslEndToEndAuthorizationTest.scala |   1 -
 .../GroupCoordinatorResponseTest.scala          | 213 +++++---
 .../coordinator/GroupMetadataManagerTest.scala  |  11 +-
 .../kafka/coordinator/GroupMetadataTest.scala   |  59 +-
 .../kafka/coordinator/MemberMetadataTest.scala  |  16 +-
 .../unit/kafka/utils/timer/MockTimer.scala      |   2 +-
 43 files changed, 1686 insertions(+), 1281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
index 26ef48e..5695be8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
@@ -28,7 +28,12 @@ public class CommitFailedException extends KafkaException {
 
     private static final long serialVersionUID = 1L;
 
-    public CommitFailedException(String message) {
-        super(message);
+    public CommitFailedException() {
+        super("Commit cannot be completed since the group has already " +
+                "rebalanced and assigned the partitions to another member. This means that the time " +
+                "between subsequent calls to poll() was longer than the configured max.poll.interval.ms, " +
+                "which typically implies that the poll loop is spending too much time message processing. " +
+                "You can address this either by increasing the session timeout or by reducing the maximum " +
+                "size of batches returned in poll() with max.poll.records.");
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index de10bed..509c3a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -48,24 +48,33 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
     private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";
 
+    /** <code>max.poll.interval.ms</code> */
+    public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
+    private static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using " +
+            "consumer group management. This places an upper bound on the amount of time that the consumer can be idle " +
+            "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer " +
+            "is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
+
     /**
      * <code>session.timeout.ms</code>
      */
     public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
-    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's " +
-            "group management facilities. When a consumer's heartbeat is not received within the session timeout, " +
-            "the broker will mark the consumer as failed and rebalance the group. Since heartbeats are sent only " +
-            "when poll() is invoked, a higher session timeout allows more time for message processing in the consumer's " +
-            "poll loop at the cost of a longer time to detect hard failures. See also <code>" + MAX_POLL_RECORDS_CONFIG + "</code> for " +
-            "another option to control the processing time in the poll loop. Note that the value must be in the " +
-            "allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> " +
+    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect consumer failures when using " +
+            "Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness " +
+            "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " +
+            "then the broker will remove this consumer from the group and initiate a rebalance. Note that the value " +
+            "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> " +
             "and <code>group.max.session.timeout.ms</code>.";
 
     /**
      * <code>heartbeat.interval.ms</code>
      */
     public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
-    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " +
+            "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " +
+            "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. " +
+            "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher " +
+            "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
 
     /**
      * <code>bootstrap.servers</code>
@@ -196,7 +205,7 @@ public class ConsumerConfig extends AbstractConfig {
                                 .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
                                 .define(SESSION_TIMEOUT_MS_CONFIG,
                                         Type.INT,
-                                        30000,
+                                        10000,
                                         Importance.HIGH,
                                         SESSION_TIMEOUT_MS_DOC)
                                 .define(HEARTBEAT_INTERVAL_MS_CONFIG,
@@ -221,7 +230,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         Importance.MEDIUM,
                                         ENABLE_AUTO_COMMIT_DOC)
                                 .define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
-                                        Type.LONG,
+                                        Type.INT,
                                         5000,
                                         atLeast(0),
                                         Importance.LOW,
@@ -311,7 +320,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         VALUE_DESERIALIZER_CLASS_DOC)
                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
                                         Type.INT,
-                                        40 * 1000,
+                                        305000, // chosen to be higher than the default of max.poll.interval.ms
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
@@ -328,10 +337,16 @@ public class ConsumerConfig extends AbstractConfig {
                                         INTERCEPTOR_CLASSES_DOC)
                                 .define(MAX_POLL_RECORDS_CONFIG,
                                         Type.INT,
-                                        Integer.MAX_VALUE,
+                                        500,
                                         atLeast(1),
                                         Importance.MEDIUM,
                                         MAX_POLL_RECORDS_DOC)
+                                .define(MAX_POLL_INTERVAL_MS_CONFIG,
+                                        Type.INT,
+                                        300000,
+                                        atLeast(1),
+                                        Importance.MEDIUM,
+                                        MAX_POLL_INTERVAL_MS_DOC)
                                 .define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
                                         Type.BOOLEAN,
                                         DEFAULT_EXCLUDE_INTERNAL_TOPICS,

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 522cfde..ef91302 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -137,32 +137,31 @@ import java.util.regex.Pattern;
  * After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is
  * invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
  * will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers,
- * the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown),
- * then no heartbeats will be sent. If a period of the configured <i>session timeout</i> elapses before the server
- * has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned.
- * This is designed to prevent situations where the consumer has failed, yet continues to hold onto the partitions
- * it was assigned (thus preventing active consumers in the group from taking them). To stay in the group, you
- * have to prove you are still alive by calling poll.
+ * the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for
+ * a duration of <code>session.timeout.ms</code>, then the consumer will be considered dead and its partitions will
+ * be reassigned. It is also possible that the consumer could encounter a "livelock" situation where it is continuing
+ * to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions
+ * indefinitely in this case, we provide a liveness detection mechanism: basically if you don't call poll at least
+ * as frequently as the configured <code>poll.interval.ms</code>, then the client will proactively leave the group
+ * so that another consumer can take over its partitions. So to stay in the group, you must continue to call poll
  * <p>
  * The implication of this design is that message processing time in the poll loop must be bounded so that
- * heartbeats can be sent before expiration of the session timeout. What typically happens when processing time
- * exceeds the session timeout is that the consumer won't be able to commit offsets for any of the processed records.
- * For example, this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}. This
- * guarantees that only active members of the group are allowed to commit offsets. If the consumer
- * has been kicked out of the group, then its partitions will have been assigned to another member, which will be
- * committing its own offsets as it handles new records. This gives offset commits an isolation guarantee.
+ * you always ensure that poll() is called at least once every poll interval. If not, then the consumer leaves
+ * the group, which typically results in an offset commit failure when the processing of the polled records
+ * finally completes (this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}).
+ * This is a safety mechanism which guarantees that only active members of the group are able to commit offsets.
+ * If the consumer has been kicked out of the group, then its partitions will have been assigned to another member,
+ * which will be committing its own offsets as it handles new records. This gives offset commits an isolation guarantee.
  * <p>
- * The consumer provides two configuration settings to control this behavior:
+ * The consumer provides two configuration settings to control the behavior of the poll loop:
  * <ol>
- *     <li><code>session.timeout.ms</code>: By increasing the session timeout, you can give the consumer more
- *     time to handle a batch of records returned from {@link #poll(long)}. The only drawback is that it
- *     will take longer for the server to detect hard consumer failures, which can cause a delay before
- *     a rebalance can be completed. However, clean shutdown with {@link #close()} is not impacted since
- *     the consumer will send an explicit message to the server to leave the group and cause an immediate
- *     rebalance.</li>
- *     <li><code>max.poll.records</code>: Processing time in the poll loop is typically proportional to the number
- *     of records processed, so it's natural to want to set a limit on the number of records handled at once.
- *     This setting provides that. By default, there is essentially no limit.</li>
+ *     <li><code>max.poll.interval.ms</code>: By increasing the interval between expected polls, you can give
+ *     the consumer more time to handle a batch of records returned from {@link #poll(long)}. The drawback
+ *     is that increasing this value may delay a group rebalance since the consumer will only join the rebalance
+ *     inside the call to poll.</li>
+ *     <li><code>max.poll.records</code>: Use this setting to limit the total records returned from a single
+ *     call to poll. This can make it easier to predict the maximum that must be handled within each poll
+ *     interval.</li>
  * </ol>
  * <p>
  * For use cases where message processing time varies unpredictably, neither of these options may be viable.
@@ -187,7 +186,6 @@ import java.util.regex.Pattern;
  *     props.put(&quot;group.id&quot;, &quot;test&quot;);
  *     props.put(&quot;enable.auto.commit&quot;, &quot;true&quot;);
  *     props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
- *     props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
  *     props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
@@ -210,13 +208,6 @@ import java.util.regex.Pattern;
  * In this example the client is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers
  * called <i>test</i> as described above.
  * <p>
- * The broker will automatically detect failed processes in the <i>test</i> group by using a heartbeat mechanism. The
- * consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. Note that
- * the consumer is single-threaded, so periodic heartbeats can only be sent when {@link #poll(long)} is called. As long as
- * the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
- * to it. If it stops heartbeating by failing to call {@link #poll(long)} for a period of time longer than <code>session.timeout.ms</code>
- * then it will be considered dead and its partitions will be assigned to another process.
- * <p>
  * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
  * are saying that our record's key and value will just be simple strings.
  *
@@ -242,7 +233,6 @@ import java.util.regex.Pattern;
  *     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
  *     props.put(&quot;group.id&quot;, &quot;test&quot;);
  *     props.put(&quot;enable.auto.commit&quot;, &quot;false&quot;);
- *     props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
  *     props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
@@ -645,6 +635,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
             this.coordinator = new ConsumerCoordinator(this.client,
                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
+                    config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
                     config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                     assignors,
@@ -656,7 +647,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     retryBackoffMs,
                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
                     config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
-                    config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+                    config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                     this.interceptors,
                     config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
             if (keyDeserializer == null) {
@@ -715,6 +706,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                   Metrics metrics,
                   SubscriptionState subscriptions,
                   Metadata metadata,
+                  boolean autoCommitEnabled,
+                  int autoCommitIntervalMs,
+                  int heartbeatIntervalMs,
                   long retryBackoffMs,
                   long requestTimeoutMs) {
         this.clientId = clientId;
@@ -970,7 +964,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     //
                     // NOTE: since the consumed position has already been updated, we must not allow
                     // wakeups or any other errors to be triggered prior to returning the fetched records.
-                    // Additionally, pollNoWakeup does not allow automatic commits to get triggered.
                     fetcher.sendFetches();
                     client.pollNoWakeup();
 
@@ -997,30 +990,23 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The fetched records (may be empty)
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
-        // ensure we have partitions assigned if we expect to
-        if (subscriptions.partitionsAutoAssigned())
-            coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         // fetch positions if we have partitions we're subscribed to that we
         // don't know the offset for
         if (!subscriptions.hasAllFetchPositions())
             updateFetchPositions(this.subscriptions.missingFetchPositions());
 
-        long now = time.milliseconds();
-
-        // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
-        client.executeDelayedTasks(now);
-
-        // init any new fetches (won't resend pending fetches)
+        // if data is available already, return it immediately
         Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
-
-        // if data is available already, e.g. from a previous network client poll() call to commit,
-        // then just return it immediately
         if (!records.isEmpty())
             return records;
 
+        // send any new fetches (won't resend pending fetches)
         fetcher.sendFetches();
-        client.poll(timeout, now);
+
+        long now = time.milliseconds();
+        client.poll(Math.min(coordinator.timeToNextPoll(now), timeout), now);
         return fetcher.fetchedRecords();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index e957856..690df26 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -53,6 +54,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * AbstractCoordinator implements group management for a single group member by interacting with
@@ -77,26 +79,38 @@ import java.util.concurrent.TimeUnit;
  * by the leader in {@link #performAssignment(String, String, Map)} and becomes available to members in
  * {@link #onJoinComplete(int, String, String, ByteBuffer)}.
  *
+ * Note on locking: this class shares state between the caller and a background thread which is
+ * used for sending heartbeats after the client has joined the group. All mutable state as well as
+ * state transitions are protected with the class's monitor. Generally this means acquiring the lock
+ * before reading or writing the state of the group (e.g. generation, memberId) and holding the lock
+ * when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup).
  */
 public abstract class AbstractCoordinator implements Closeable {
 
     private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class);
 
-    private final Heartbeat heartbeat;
-    private final HeartbeatTask heartbeatTask;
+    private enum MemberState {
+        UNJOINED,    // the client is not part of a group
+        REBALANCING, // the client has begun rebalancing
+        STABLE,      // the client has joined and is sending heartbeats
+    }
+
+    private final int rebalanceTimeoutMs;
     private final int sessionTimeoutMs;
     private final GroupCoordinatorMetrics sensors;
+    private final Heartbeat heartbeat;
     protected final String groupId;
     protected final ConsumerNetworkClient client;
     protected final Time time;
     protected final long retryBackoffMs;
 
-    private boolean needsJoinPrepare = true;
+    private HeartbeatThread heartbeatThread = null;
     private boolean rejoinNeeded = true;
-    protected Node coordinator;
-    protected String memberId;
-    protected String protocol;
-    protected int generation;
+    private boolean needsJoinPrepare = true;
+    private MemberState state = MemberState.UNJOINED;
+    private RequestFuture<ByteBuffer> joinFuture = null;
+    private Node coordinator = null;
+    private Generation generation = Generation.NO_GENERATION;
 
     private RequestFuture<Void> findCoordinatorFuture = null;
 
@@ -105,6 +119,7 @@ public abstract class AbstractCoordinator implements Closeable {
      */
     public AbstractCoordinator(ConsumerNetworkClient client,
                                String groupId,
+                               int rebalanceTimeoutMs,
                                int sessionTimeoutMs,
                                int heartbeatIntervalMs,
                                Metrics metrics,
@@ -113,19 +128,16 @@ public abstract class AbstractCoordinator implements Closeable {
                                long retryBackoffMs) {
         this.client = client;
         this.time = time;
-        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
-        this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
         this.groupId = groupId;
-        this.coordinator = null;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.sessionTimeoutMs = sessionTimeoutMs;
-        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
-        this.heartbeatTask = new HeartbeatTask();
+        this.heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs);
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
     }
 
     /**
-     * Unique identifier for the class of protocols implements (e.g. "consumer" or "connect").
+     * Unique identifier for the class of supported protocols (e.g. "consumer" or "connect").
      * @return Non-null protocol type name
      */
     protected abstract String protocolType();
@@ -175,7 +187,7 @@ public abstract class AbstractCoordinator implements Closeable {
     /**
      * Block until the coordinator for this group is known and is ready to receive requests.
      */
-    public void ensureCoordinatorReady() {
+    public synchronized void ensureCoordinatorReady() {
         while (coordinatorUnknown()) {
             RequestFuture<Void> future = lookupCoordinator();
             client.poll(future);
@@ -216,14 +228,44 @@ public abstract class AbstractCoordinator implements Closeable {
      * Check whether the group should be rejoined (e.g. if metadata changes)
      * @return true if it should, false otherwise
      */
-    protected boolean needRejoin() {
+    protected synchronized boolean needRejoin() {
         return rejoinNeeded;
     }
 
     /**
+     * Check the status of the heartbeat thread (if it is active) and indicate the liveness
+     * of the client. This must be called periodically after joining with {@link #ensureActiveGroup()}
+     * to ensure that the member stays in the group. If an interval of time longer than the
+     * provided rebalance timeout expires without calling this method, then the client will proactively
+     * leave the group.
+     * @param now current time in milliseconds
+     * @throws RuntimeException for unexpected errors raised from the heartbeat thread
+     */
+    protected synchronized void pollHeartbeat(long now) {
+        if (heartbeatThread != null) {
+            if (heartbeatThread.hasFailed()) {
+                // set the heartbeat thread to null and raise an exception. If the user catches it,
+                // the next call to ensureActiveGroup() will spawn a new heartbeat thread.
+                RuntimeException cause = heartbeatThread.failureCause();
+                heartbeatThread = null;
+                throw cause;
+            }
+
+            heartbeat.poll(now);
+        }
+    }
+
+    protected synchronized long timeToNextHeartbeat(long now) {
+        // if we have not joined the group, we don't need to send heartbeats
+        if (state == MemberState.UNJOINED)
+            return Long.MAX_VALUE;
+        return heartbeat.timeToNextHeartbeat(now);
+    }
+
+    /**
      * Ensure that the group is active (i.e. joined and synced)
      */
-    public void ensureActiveGroup() {
+    public synchronized void ensureActiveGroup() {
         // always ensure that the coordinator is ready because we may have been disconnected
         // when sending heartbeats and does not necessarily require us to rejoin the group.
         ensureCoordinatorReady();
@@ -231,11 +273,18 @@ public abstract class AbstractCoordinator implements Closeable {
         if (!needRejoin())
             return;
 
+        // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
+        // time if the client is woken up before a pending rebalance completes.
         if (needsJoinPrepare) {
-            onJoinPrepare(generation, memberId);
+            onJoinPrepare(generation.generationId, generation.memberId);
             needsJoinPrepare = false;
         }
 
+        if (heartbeatThread == null) {
+            heartbeatThread = new HeartbeatThread();
+            heartbeatThread.start();
+        }
+
         while (needRejoin()) {
             ensureCoordinatorReady();
 
@@ -246,23 +295,41 @@ public abstract class AbstractCoordinator implements Closeable {
                 continue;
             }
 
-            RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
-            future.addListener(new RequestFutureListener<ByteBuffer>() {
-                @Override
-                public void onSuccess(ByteBuffer value) {
-                    // handle join completion in the callback so that the callback will be invoked
-                    // even if the consumer is woken up before finishing the rebalance
-                    onJoinComplete(generation, memberId, protocol, value);
-                    needsJoinPrepare = true;
-                    heartbeatTask.reset();
-                }
+            // we store the join future in case we are woken up by the user after beginning the
+            // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
+            // to rejoin before the pending rebalance has completed.
+            if (joinFuture == null) {
+                state = MemberState.REBALANCING;
+                joinFuture = sendJoinGroupRequest();
+                joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
+                    @Override
+                    public void onSuccess(ByteBuffer value) {
+                        // handle join completion in the callback so that the callback will be invoked
+                        // even if the consumer is woken up before finishing the rebalance
+                        synchronized (AbstractCoordinator.this) {
+                            log.info("Successfully joined group {} with generation {}", groupId, generation.generationId);
+                            joinFuture = null;
+                            state = MemberState.STABLE;
+                            needsJoinPrepare = true;
+                            heartbeatThread.enable();
+                        }
 
-                @Override
-                public void onFailure(RuntimeException e) {
-                    // we handle failures below after the request finishes. if the join completes
-                    // after having been woken up, the exception is ignored and we will rejoin
-                }
-            });
+                        onJoinComplete(generation.generationId, generation.memberId, generation.protocol, value);
+                    }
+
+                    @Override
+                    public void onFailure(RuntimeException e) {
+                        // we handle failures below after the request finishes. if the join completes
+                        // after having been woken up, the exception is ignored and we will rejoin
+                        synchronized (AbstractCoordinator.this) {
+                            joinFuture = null;
+                            state = MemberState.UNJOINED;
+                        }
+                    }
+                });
+            }
+
+            RequestFuture<ByteBuffer> future = joinFuture;
             client.poll(future);
 
             if (future.failed()) {
@@ -278,63 +345,6 @@ public abstract class AbstractCoordinator implements Closeable {
         }
     }
 
-    private class HeartbeatTask implements DelayedTask {
-
-        private boolean requestInFlight = false;
-
-        public void reset() {
-            // start or restart the heartbeat task to be executed at the next chance
-            long now = time.milliseconds();
-            heartbeat.resetSessionTimeout(now);
-            client.unschedule(this);
-
-            if (!requestInFlight)
-                client.schedule(this, now);
-        }
-
-        @Override
-        public void run(final long now) {
-            if (generation < 0 || needRejoin() || coordinatorUnknown()) {
-                // no need to send the heartbeat we're not using auto-assignment or if we are
-                // awaiting a rebalance
-                return;
-            }
-
-            if (heartbeat.sessionTimeoutExpired(now)) {
-                // we haven't received a successful heartbeat in one session interval
-                // so mark the coordinator dead
-                coordinatorDead();
-                return;
-            }
-
-            if (!heartbeat.shouldHeartbeat(now)) {
-                // we don't need to heartbeat now, so reschedule for when we do
-                client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
-            } else {
-                heartbeat.sentHeartbeat(now);
-                requestInFlight = true;
-
-                RequestFuture<Void> future = sendHeartbeatRequest();
-                future.addListener(new RequestFutureListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {
-                        requestInFlight = false;
-                        long now = time.milliseconds();
-                        heartbeat.receiveHeartbeat(now);
-                        long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
-                        client.schedule(HeartbeatTask.this, nextHeartbeatTime);
-                    }
-
-                    @Override
-                    public void onFailure(RuntimeException e) {
-                        requestInFlight = false;
-                        client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
-                    }
-                });
-            }
-        }
-    }
-
     /**
      * Join the group and return the assignment for the next generation. This function handles both
      * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
@@ -350,7 +360,8 @@ public abstract class AbstractCoordinator implements Closeable {
         JoinGroupRequest request = new JoinGroupRequest(
                 groupId,
                 this.sessionTimeoutMs,
-                this.memberId,
+                this.rebalanceTimeoutMs,
+                this.generation.memberId,
                 protocolType(),
                 metadata());
 
@@ -359,7 +370,6 @@ public abstract class AbstractCoordinator implements Closeable {
                 .compose(new JoinGroupResponseHandler());
     }
 
-
     private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
 
         @Override
@@ -372,24 +382,32 @@ public abstract class AbstractCoordinator implements Closeable {
             Errors error = Errors.forCode(joinResponse.errorCode());
             if (error == Errors.NONE) {
                 log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
-                AbstractCoordinator.this.memberId = joinResponse.memberId();
-                AbstractCoordinator.this.generation = joinResponse.generationId();
-                AbstractCoordinator.this.rejoinNeeded = false;
-                AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                 sensors.joinLatency.record(response.requestLatencyMs());
-                if (joinResponse.isLeader()) {
-                    onJoinLeader(joinResponse).chain(future);
-                } else {
-                    onJoinFollower().chain(future);
+
+                synchronized (AbstractCoordinator.this) {
+                    if (state != MemberState.REBALANCING) {
+                        // if the consumer was woken up before a rebalance completes, we may have already left
+                        // the group. In this case, we do not want to continue with the sync group.
+                        future.raise(new UnjoinedGroupException());
+                    } else {
+                        AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
+                                joinResponse.memberId(), joinResponse.groupProtocol());
+                        AbstractCoordinator.this.rejoinNeeded = false;
+                        if (joinResponse.isLeader()) {
+                            onJoinLeader(joinResponse).chain(future);
+                        } else {
+                            onJoinFollower().chain(future);
+                        }
+                    }
                 }
             } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                 log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,
-                        coordinator);
+                        coordinator());
                 // backoff and retry
                 future.raise(error);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                 // reset the member id and retry immediately
-                AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+                resetGeneration();
                 log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
             } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
@@ -415,8 +433,8 @@ public abstract class AbstractCoordinator implements Closeable {
 
     private RequestFuture<ByteBuffer> onJoinFollower() {
         // send follower's sync group with an empty assignment
-        SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
-                memberId, Collections.<String, ByteBuffer>emptyMap());
+        SyncGroupRequest request = new SyncGroupRequest(groupId, generation.generationId,
+                generation.memberId, Collections.<String, ByteBuffer>emptyMap());
         log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
         return sendSyncGroupRequest(request);
     }
@@ -427,7 +445,7 @@ public abstract class AbstractCoordinator implements Closeable {
             Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
                     joinResponse.members());
 
-            SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
+            SyncGroupRequest request = new SyncGroupRequest(groupId, generation.generationId, generation.memberId, groupAssignment);
             log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
             return sendSyncGroupRequest(request);
         } catch (RuntimeException e) {
@@ -454,11 +472,11 @@ public abstract class AbstractCoordinator implements Closeable {
                            RequestFuture<ByteBuffer> future) {
             Errors error = Errors.forCode(syncResponse.errorCode());
             if (error == Errors.NONE) {
-                log.info("Successfully joined group {} with generation {}", groupId, generation);
                 sensors.syncLatency.record(response.requestLatencyMs());
                 future.complete(syncResponse.memberAssignment());
             } else {
-                AbstractCoordinator.this.rejoinNeeded = true;
+                requestRejoin();
+
                 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(new GroupAuthorizationException(groupId));
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
@@ -467,7 +485,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 } else if (error == Errors.UNKNOWN_MEMBER_ID
                         || error == Errors.ILLEGAL_GENERATION) {
                     log.debug("SyncGroup for group {} failed due to {}", groupId, error);
-                    AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+                    resetGeneration();
                     future.raise(error);
                 } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
@@ -499,43 +517,36 @@ public abstract class AbstractCoordinator implements Closeable {
             log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
             GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
             return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
-                    .compose(new RequestFutureAdapter<ClientResponse, Void>() {
-                        @Override
-                        public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
-                            handleGroupMetadataResponse(response, future);
-                        }
-                    });
+                    .compose(new GroupCoordinatorResponseHandler());
         }
     }
 
-    private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
-        log.debug("Received group coordinator response {}", resp);
+    private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
+
+        @Override
+        public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
+            log.debug("Received group coordinator response {}", resp);
 
-        if (!coordinatorUnknown()) {
-            // We already found the coordinator, so ignore the request
-            future.complete(null);
-        } else {
             GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
             // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
             Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
             if (error == Errors.NONE) {
-                this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
-                        groupCoordinatorResponse.node().host(),
-                        groupCoordinatorResponse.node().port());
-
-                log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
-
-                client.tryConnect(coordinator);
-
-                // start sending heartbeats only if we have a valid generation
-                if (generation > 0)
-                    heartbeatTask.reset();
+                synchronized (AbstractCoordinator.this) {
+                    AbstractCoordinator.this.coordinator = new Node(
+                            Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
+                            groupCoordinatorResponse.node().host(),
+                            groupCoordinatorResponse.node().port());
+                    log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
+                    client.tryConnect(coordinator);
+                    heartbeat.resetTimeouts(time.milliseconds());
+                }
                 future.complete(null);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 future.raise(new GroupAuthorizationException(groupId));
             } else {
+                log.debug("Group coordinator lookup for group {} failed: {}", groupId, error.message());
                 future.raise(error);
             }
         }
@@ -546,21 +557,25 @@ public abstract class AbstractCoordinator implements Closeable {
      * @return true if the coordinator is unknown
      */
     public boolean coordinatorUnknown() {
-        if (coordinator == null)
-            return true;
+        return coordinator() == null;
+    }
 
-        if (client.connectionFailed(coordinator)) {
+    /**
+     * Get the current coordinator
+     * @return the current coordinator or null if it is unknown
+     */
+    protected synchronized Node coordinator() {
+        if (coordinator != null && client.connectionFailed(coordinator)) {
             coordinatorDead();
-            return true;
+            return null;
         }
-
-        return false;
+        return this.coordinator;
     }
 
     /**
      * Mark the current coordinator as dead.
      */
-    protected void coordinatorDead() {
+    protected synchronized void coordinatorDead() {
         if (this.coordinator != null) {
             log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
             client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
@@ -569,50 +584,56 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     /**
+     * Get the current generation state if the group is stable.
+     * @return the current generation or null if the group is unjoined/rebalancing
+     */
+    protected synchronized Generation generation() {
+        if (this.state != MemberState.STABLE)
+            return null;
+        return generation;
+    }
+
+    /**
+     * Reset the generation and memberId because we have fallen out of the group.
+     */
+    protected synchronized void resetGeneration() {
+        this.generation = Generation.NO_GENERATION;
+        this.rejoinNeeded = true;
+        this.state = MemberState.UNJOINED;
+    }
+
+    protected synchronized void requestRejoin() {
+        this.rejoinNeeded = true;
+    }
+
+    /**
      * Close the coordinator, waiting if needed to send LeaveGroup.
      */
     @Override
-    public void close() {
-        // we do not need to re-enable wakeups since we are closing already
-        client.disableWakeups();
+    public synchronized void close() {
+        if (heartbeatThread != null)
+            heartbeatThread.close();
         maybeLeaveGroup();
     }
 
     /**
      * Leave the current group and reset local generation/memberId.
      */
-    public void maybeLeaveGroup() {
-        client.unschedule(heartbeatTask);
-        if (!coordinatorUnknown() && generation > 0) {
+    public synchronized void maybeLeaveGroup() {
+        if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) {
             // this is a minimal effort attempt to leave the group. we do not
             // attempt any resending if the request fails or times out.
-            sendLeaveGroupRequest();
+            LeaveGroupRequest request = new LeaveGroupRequest(groupId, generation.memberId);
+            client.send(coordinator, ApiKeys.LEAVE_GROUP, request)
+                    .compose(new LeaveGroupResponseHandler());
+            client.pollNoWakeup();
         }
 
-        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
-        this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
-        rejoinNeeded = true;
-    }
-
-    private void sendLeaveGroupRequest() {
-        LeaveGroupRequest request = new LeaveGroupRequest(groupId, memberId);
-        RequestFuture<Void> future = client.send(coordinator, ApiKeys.LEAVE_GROUP, request)
-                .compose(new LeaveGroupResponseHandler());
-
-        future.addListener(new RequestFutureListener<Void>() {
-            @Override
-            public void onSuccess(Void value) {}
-
-            @Override
-            public void onFailure(RuntimeException e) {
-                log.debug("LeaveGroup request for group {} failed with error", groupId, e);
-            }
-        });
-
-        client.poll(future, 0);
+        resetGeneration();
     }
 
     private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
+
         @Override
         public LeaveGroupResponse parse(ClientResponse response) {
             return new LeaveGroupResponse(response.responseBody());
@@ -620,25 +641,26 @@ public abstract class AbstractCoordinator implements Closeable {
 
         @Override
         public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
-            // process the response
-            short errorCode = leaveResponse.errorCode();
-            if (errorCode == Errors.NONE.code())
+            Errors error = Errors.forCode(leaveResponse.errorCode());
+            if (error == Errors.NONE) {
+                log.debug("LeaveGroup request for group {} returned successfully", groupId);
                 future.complete(null);
-            else
-                future.raise(Errors.forCode(errorCode));
+            } else {
+                log.debug("LeaveGroup request for group {} failed with error: {}", groupId, error.message());
+                future.raise(error);
+            }
         }
     }
 
-    /**
-     * Send a heartbeat request now (visible only for testing).
-     */
-    public RequestFuture<Void> sendHeartbeatRequest() {
-        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId);
+    // visible for testing
+    synchronized RequestFuture<Void> sendHeartbeatRequest() {
+        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation.generationId, this.generation.memberId);
         return client.send(coordinator, ApiKeys.HEARTBEAT, req)
-                .compose(new HeartbeatCompletionHandler());
+                .compose(new HeartbeatResponseHandler());
     }
 
-    private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
+    private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
+
         @Override
         public HeartbeatResponse parse(ClientResponse response) {
             return new HeartbeatResponse(response.responseBody());
@@ -654,21 +676,20 @@ public abstract class AbstractCoordinator implements Closeable {
             } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                 log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
-                        groupId, coordinator);
+                        groupId, coordinator());
                 coordinatorDead();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                 log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
-                AbstractCoordinator.this.rejoinNeeded = true;
+                requestRejoin();
                 future.raise(Errors.REBALANCE_IN_PROGRESS);
             } else if (error == Errors.ILLEGAL_GENERATION) {
                 log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
-                AbstractCoordinator.this.rejoinNeeded = true;
+                resetGeneration();
                 future.raise(Errors.ILLEGAL_GENERATION);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                 log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
-                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
-                AbstractCoordinator.this.rejoinNeeded = true;
+                resetGeneration();
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 future.raise(new GroupAuthorizationException(groupId));
@@ -678,8 +699,7 @@ public abstract class AbstractCoordinator implements Closeable {
         }
     }
 
-    protected abstract class CoordinatorResponseHandler<R, T>
-            extends RequestFutureAdapter<ClientResponse, T> {
+    protected abstract class CoordinatorResponseHandler<R, T> extends RequestFutureAdapter<ClientResponse, T> {
         protected ClientResponse response;
 
         public abstract R parse(ClientResponse response);
@@ -758,9 +778,149 @@ public abstract class AbstractCoordinator implements Closeable {
                 };
             metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago",
                 this.metricGrpName,
-                "The number of seconds since the last controller heartbeat"),
+                "The number of seconds since the last controller heartbeat was sent"),
                 lastHeartbeat);
         }
     }
 
+    private class HeartbeatThread extends Thread {
+        private boolean enabled = false;
+        private boolean closed = false;
+        private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
+
+        public void enable() {
+            synchronized (AbstractCoordinator.this) {
+                this.enabled = true;
+                heartbeat.resetTimeouts(time.milliseconds());
+                AbstractCoordinator.this.notify();
+            }
+        }
+
+        public void disable() {
+            synchronized (AbstractCoordinator.this) {
+                this.enabled = false;
+            }
+        }
+
+        public void close() {
+            synchronized (AbstractCoordinator.this) {
+                this.closed = true;
+                AbstractCoordinator.this.notify();
+            }
+        }
+
+        private boolean hasFailed() {
+            return failed.get() != null;
+        }
+
+        private RuntimeException failureCause() {
+            return failed.get();
+        }
+
+        @Override
+        public void run() {
+            try {
+                RequestFuture findCoordinatorFuture = null;
+
+                while (true) {
+                    synchronized (AbstractCoordinator.this) {
+                        if (closed)
+                            return;
+
+                        if (!enabled) {
+                            AbstractCoordinator.this.wait();
+                            continue;
+                        }
+
+                        if (state != MemberState.STABLE) {
+                            // the group is not stable (perhaps because we left the group or because the coordinator
+                            // kicked us out), so disable heartbeats and wait for the main thread to rejoin.
+                            disable();
+                            continue;
+                        }
+
+                        client.pollNoWakeup();
+                        long now = time.milliseconds();
+
+                        if (coordinatorUnknown()) {
+                            if (findCoordinatorFuture == null || findCoordinatorFuture.isDone())
+                                findCoordinatorFuture = lookupCoordinator();
+                            else
+                                AbstractCoordinator.this.wait(retryBackoffMs);
+                        } else if (heartbeat.sessionTimeoutExpired(now)) {
+                            // the session timeout has expired without seeing a successful heartbeat, so we should
+                            // probably make sure the coordinator is still healthy.
+                            coordinatorDead();
+                        } else if (heartbeat.pollTimeoutExpired(now)) {
+                            // the poll timeout has expired, which means that the foreground thread has stalled
+                            // in between calls to poll(), so we explicitly leave the group.
+                            maybeLeaveGroup();
+                        } else if (!heartbeat.shouldHeartbeat(now)) {
+                            // poll again after waiting for the retry backoff in case the heartbeat failed or the
+                            // coordinator disconnected
+                            AbstractCoordinator.this.wait(retryBackoffMs);
+                        } else {
+                            heartbeat.sentHeartbeat(now);
+
+                            sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
+                                @Override
+                                public void onSuccess(Void value) {
+                                    synchronized (AbstractCoordinator.this) {
+                                        heartbeat.receiveHeartbeat(time.milliseconds());
+                                    }
+                                }
+
+                                @Override
+                                public void onFailure(RuntimeException e) {
+                                    synchronized (AbstractCoordinator.this) {
+                                        if (e instanceof RebalanceInProgressException) {
+                                            // it is valid to continue heartbeating while the group is rebalancing. This
+                                            // ensures that the coordinator keeps the member in the group for as long
+                                            // as the duration of the rebalance timeout. If we stop sending heartbeats,
+                                            // however, then the session timeout may expire before we can rejoin.
+                                            heartbeat.receiveHeartbeat(time.milliseconds());
+                                        } else {
+                                            heartbeat.failHeartbeat();
+
+                                            // wake up the thread if it's sleeping to reschedule the heartbeat
+                                            AbstractCoordinator.this.notify();
+                                        }
+                                    }
+                                }
+                            });
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
+                this.failed.set(new RuntimeException(e));
+            } catch (RuntimeException e) {
+                log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e);
+                this.failed.set(e);
+            }
+        }
+
+    }
+
+    protected static class Generation {
+        public static final Generation NO_GENERATION = new Generation(
+                OffsetCommitRequest.DEFAULT_GENERATION_ID,
+                JoinGroupRequest.UNKNOWN_MEMBER_ID,
+                null);
+
+        public final int generationId;
+        public final String memberId;
+        public final String protocol;
+
+        public Generation(int generationId, String memberId, String protocol) {
+            this.generationId = generationId;
+            this.memberId = memberId;
+            this.protocol = protocol;
+        }
+    }
+
+    private static class UnjoinedGroupException extends RetriableException {
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 81a40f1..5fee45a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -18,12 +18,13 @@ import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -54,6 +55,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * This class manages the coordination process with the consumer coordinator.
@@ -68,18 +70,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final SubscriptionState subscriptions;
     private final OffsetCommitCallback defaultOffsetCommitCallback;
     private final boolean autoCommitEnabled;
-    private final AutoCommitTask autoCommitTask;
+    private final int autoCommitIntervalMs;
     private final ConsumerInterceptors<?, ?> interceptors;
     private final boolean excludeInternalTopics;
 
+    // this collection must be thread-safe because it is modified from the response handler
+    // of offset commit requests, which may be invoked from the heartbeat thread
+    private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
+
     private MetadataSnapshot metadataSnapshot;
     private MetadataSnapshot assignmentSnapshot;
+    private long nextAutoCommitDeadline;
 
     /**
      * Initialize the coordination manager.
      */
     public ConsumerCoordinator(ConsumerNetworkClient client,
                                String groupId,
+                               int rebalanceTimeoutMs,
                                int sessionTimeoutMs,
                                int heartbeatIntervalMs,
                                List<PartitionAssignor> assignors,
@@ -91,11 +99,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                long retryBackoffMs,
                                OffsetCommitCallback defaultOffsetCommitCallback,
                                boolean autoCommitEnabled,
-                               long autoCommitIntervalMs,
+                               int autoCommitIntervalMs,
                                ConsumerInterceptors<?, ?> interceptors,
                                boolean excludeInternalTopics) {
         super(client,
                 groupId,
+                rebalanceTimeoutMs,
                 sessionTimeoutMs,
                 heartbeatIntervalMs,
                 metrics,
@@ -103,26 +112,22 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 time,
                 retryBackoffMs);
         this.metadata = metadata;
-
-        this.metadata.requestUpdate();
         this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
         this.subscriptions = subscriptions;
         this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
         this.autoCommitEnabled = autoCommitEnabled;
+        this.autoCommitIntervalMs = autoCommitIntervalMs;
         this.assignors = assignors;
-
-        addMetadataListener();
-
-        if (autoCommitEnabled) {
-            this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
-            this.autoCommitTask.reschedule();
-        } else {
-            this.autoCommitTask = null;
-        }
-
+        this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.interceptors = interceptors;
         this.excludeInternalTopics = excludeInternalTopics;
+
+        if (autoCommitEnabled)
+            this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
+
+        this.metadata.requestUpdate();
+        addMetadataListener();
     }
 
     @Override
@@ -210,8 +215,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         assignor.onAssignment(assignment);
 
         // reschedule the auto commit starting from now
-        if (autoCommitEnabled)
-            autoCommitTask.reschedule();
+        this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
 
         // execute the user's callback after rebalance
         ConsumerRebalanceListener listener = subscriptions.listener();
@@ -227,6 +231,54 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
+    /**
+     * Poll for coordinator events. This ensures that the coordinator is known and that the consumer
+     * has joined the group (if it is using group management). This also handles periodic offset commits
+     * if they are enabled.
+     *
+     * @param now current time in milliseconds
+     */
+    public void poll(long now) {
+        invokeCompletedOffsetCommitCallbacks();
+
+        if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
+            ensureCoordinatorReady();
+            now = time.milliseconds();
+        }
+
+        if (subscriptions.partitionsAutoAssigned() && needRejoin()) {
+            // due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
+            // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
+            // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
+            // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
+            // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
+            // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
+            if (subscriptions.hasPatternSubscription())
+                client.ensureFreshMetadata();
+
+            ensureActiveGroup();
+            now = time.milliseconds();
+        }
+
+        pollHeartbeat(now);
+        maybeAutoCommitOffsetsAsync(now);
+    }
+
+    /**
+     * Return the time to the next needed invocation of {@link #poll(long)}.
+     * @param now current time in milliseconds
+     * @return the maximum time in milliseconds the caller should wait before the next invocation of poll()
+     */
+    public long timeToNextPoll(long now) {
+        if (!autoCommitEnabled)
+            return timeToNextHeartbeat(now);
+
+        if (now > nextAutoCommitDeadline)
+            return 0;
+
+        return Math.min(nextAutoCommitDeadline - now, timeToNextHeartbeat(now));
+    }
+
     @Override
     protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                         String assignmentStrategy,
@@ -292,7 +344,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     @Override
-    public boolean needRejoin() {
+    protected boolean needRejoin() {
         return subscriptions.partitionsAutoAssigned() &&
                 (super.needRejoin() || subscriptions.partitionAssignmentNeeded());
     }
@@ -336,24 +388,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
-    /**
-     * Ensure that we have a valid partition assignment from the coordinator.
-     */
-    public void ensurePartitionAssignment() {
-        if (subscriptions.partitionsAutoAssigned()) {
-            // Due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
-            // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
-            // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
-            // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
-            // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
-            // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
-            if (subscriptions.hasPatternSubscription())
-                client.ensureFreshMetadata();
-
-            ensureActiveGroup();
-        }
-    }
-
     @Override
     public void close() {
         // we do not need to re-enable wakeups since we are closing already
@@ -365,8 +399,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
+    // visible for testing
+    void invokeCompletedOffsetCommitCallbacks() {
+        while (true) {
+            OffsetCommitCompletion completion = completedOffsetCommits.poll();
+            if (completion == null)
+                break;
+            completion.invoke();
+        }
+    }
+
 
     public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
+        invokeCompletedOffsetCommitCallbacks();
+
         if (!coordinatorUnknown()) {
             doCommitOffsetsAsync(offsets, callback);
         } else {
@@ -384,7 +430,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
                 @Override
                 public void onFailure(RuntimeException e) {
-                    callback.onComplete(offsets, new RetriableCommitFailedException(e));
+                    completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
                 }
             });
         }
@@ -404,16 +450,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             public void onSuccess(Void value) {
                 if (interceptors != null)
                     interceptors.onCommit(offsets);
-                cb.onComplete(offsets, null);
+
+                completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
             }
 
             @Override
             public void onFailure(RuntimeException e) {
-                if (e instanceof RetriableException) {
-                    cb.onComplete(offsets, new RetriableCommitFailedException(e));
-                } else {
-                    cb.onComplete(offsets, e);
-                }
+                Exception commitException = e;
+
+                if (e instanceof RetriableException)
+                    commitException = new RetriableCommitFailedException(e);
+
+                completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
             }
         });
     }
@@ -427,6 +475,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * @throws CommitFailedException if an unrecoverable error occurs before the commit can be completed
      */
     public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        invokeCompletedOffsetCommitCallbacks();
+
         if (offsets.isEmpty())
             return;
 
@@ -449,46 +499,25 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
-    private class AutoCommitTask implements DelayedTask {
-        private final long interval;
-
-        public AutoCommitTask(long interval) {
-            this.interval = interval;
-        }
-
-        private void reschedule() {
-            client.schedule(this, time.milliseconds() + interval);
-        }
-
-        private void reschedule(long at) {
-            client.schedule(this, at);
-        }
-
-        public void run(final long now) {
+    private void maybeAutoCommitOffsetsAsync(long now) {
+        if (autoCommitEnabled) {
             if (coordinatorUnknown()) {
-                log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId);
-                reschedule(now + retryBackoffMs);
-                return;
-            }
-
-            if (needRejoin()) {
-                // skip the commit when we're rejoining since we'll commit offsets synchronously
-                // before the revocation callback is invoked
-                reschedule(now + interval);
-                return;
-            }
-
-            commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
-                @Override
-                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-                    if (exception == null) {
-                        reschedule(now + interval);
-                    } else {
-                        log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
-                        reschedule(now + interval);
+                this.nextAutoCommitDeadline = now + retryBackoffMs;
+            } else if (now >= nextAutoCommitDeadline) {
+                this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
+                commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
+                    @Override
+                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+                        if (exception != null) {
+                            log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
+                            if (exception instanceof RetriableException)
+                                nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
+                        } else {
+                            log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId);
+                        }
                     }
-                }
-            });
+                });
+            }
         }
     }
 
@@ -506,6 +535,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
+    public static class DefaultOffsetCommitCallback implements OffsetCommitCallback {
+        @Override
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+            if (exception != null)
+                log.error("Offset commit failed.", exception);
+        }
+    }
+
     /**
      * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
      * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
@@ -515,12 +552,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * @return A request future whose value indicates whether the commit was successful or not
      */
     private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-        if (coordinatorUnknown())
-            return RequestFuture.coordinatorNotAvailable();
-
         if (offsets.isEmpty())
             return RequestFuture.voidSuccess();
 
+        Node coordinator = coordinator();
+        if (coordinator == null)
+            return RequestFuture.coordinatorNotAvailable();
+
         // create the offset commit request
         Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
         for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
@@ -529,9 +567,21 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
         }
 
-        OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
-                this.generation,
-                this.memberId,
+        final Generation generation;
+        if (subscriptions.partitionsAutoAssigned())
+            generation = generation();
+        else
+            generation = Generation.NO_GENERATION;
+
+        // if the generation is null, we are not part of an active group (and we expect to be).
+        // the only thing we can do is fail the commit and let the user rejoin the group in poll()
+        if (generation == null)
+            return RequestFuture.failure(new CommitFailedException());
+
+        OffsetCommitRequest req = new OffsetCommitRequest(
+                this.groupId,
+                generation.generationId,
+                generation.memberId,
                 OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                 offsetData);
 
@@ -541,14 +591,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 .compose(new OffsetCommitResponseHandler(offsets));
     }
 
-    public static class DefaultOffsetCommitCallback implements OffsetCommitCallback {
-        @Override
-        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-            if (exception != null)
-                log.error("Offset commit failed.", exception);
-        }
-    }
-
     private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
 
         private final Map<TopicPartition, OffsetAndMetadata> offsets;
@@ -607,13 +649,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         || error == Errors.REBALANCE_IN_PROGRESS) {
                     // need to re-join group
                     log.debug("Offset commit for group {} failed: {}", groupId, error.message());
-                    subscriptions.needReassignment();
-                    future.raise(new CommitFailedException("Commit cannot be completed since the group has already " +
-                            "rebalanced and assigned the partitions to another member. This means that the time " +
-                            "between subsequent calls to poll() was longer than the configured session.timeout.ms, " +
-                            "which typically implies that the poll loop is spending too much time message processing. " +
-                            "You can address this either by increasing the session timeout or by reducing the maximum " +
-                            "size of batches returned in poll() with max.poll.records."));
+                    resetGeneration();
+                    future.raise(new CommitFailedException());
                     return;
                 } else {
                     log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
@@ -639,7 +676,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * @return A request future containing the committed offsets.
      */
     private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
-        if (coordinatorUnknown())
+        Node coordinator = coordinator();
+        if (coordinator == null)
             return RequestFuture.coordinatorNotAvailable();
 
         log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
@@ -675,11 +713,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         // re-discover the coordinator and retry
                         coordinatorDead();
                         future.raise(error);
-                    } else if (error == Errors.UNKNOWN_MEMBER_ID
-                            || error == Errors.ILLEGAL_GENERATION) {
-                        // need to re-join group
-                        subscriptions.needReassignment();
-                        future.raise(error);
                     } else {
                         future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                     }
@@ -753,5 +786,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
+    private static class OffsetCommitCompletion {
+        private final OffsetCommitCallback callback;
+        private final Map<TopicPartition, OffsetAndMetadata> offsets;
+        private final Exception exception;
+
+        public OffsetCommitCompletion(OffsetCommitCallback callback, Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+            this.callback = callback;
+            this.offsets = offsets;
+            this.exception = exception;
+        }
+
+        public void invoke() {
+            callback.onComplete(offsets, exception);
+        }
+    }
 
 }


Mime
View raw message