kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6534) Consumer.poll may not trigger rebalance in time when there is a task migration
Date Tue, 27 Feb 2018 08:30:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378231#comment-16378231
] 

ASF GitHub Bot commented on KAFKA-6534:
---------------------------------------

guozhangwang closed pull request #4544: KAFKA-6534: Enforce a rebalance in the next poll call
when encounter task migration
URL: https://github.com/apache/kafka/pull/4544
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 6884ff0dff7..b39f52c0552 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -767,20 +767,20 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void>
futu
                 future.complete(null);
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR) {
-                log.debug("Attempt to heartbeat since coordinator {} is either not started
or not valid.",
+                log.info("Attempt to heartbeat failed since coordinator {} is either not
started or not valid.",
                         coordinator());
                 coordinatorDead();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                log.debug("Attempt to heartbeat failed since group is rebalancing");
+                log.info("Attempt to heartbeat failed since group is rebalancing");
                 requestRejoin();
                 future.raise(Errors.REBALANCE_IN_PROGRESS);
             } else if (error == Errors.ILLEGAL_GENERATION) {
-                log.debug("Attempt to heartbeat failed since generation {} is not current",
generation.generationId);
+                log.info("Attempt to heartbeat failed since generation {} is not current",
generation.generationId);
                 resetGeneration();
                 future.raise(Errors.ILLEGAL_GENERATION);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
-                log.debug("Attempt to heartbeat failed for since member id {} is not valid.",
generation.memberId);
+                log.info("Attempt to heartbeat failed for since member id {} is not valid.",
generation.memberId);
                 resetGeneration();
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d9c827fff52..a8f7e652da9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -52,6 +52,7 @@
     final Logger log;
     final LogContext logContext;
     boolean taskInitialized;
+    boolean taskClosed;
     final StateDirectory stateDirectory;
 
     InternalProcessorContext processorContext;
@@ -256,6 +257,9 @@ void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateExcep
         }
     }
 
+    public boolean isClosed() {
+        return taskClosed;
+    }
 
     public boolean hasStateStores() {
         return !topology.stateStores().isEmpty();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 7b05f6488e7..f98e6356a22 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -95,6 +95,8 @@ int process() {
                     processed++;
                 }
             } catch (final TaskMigratedException e) {
+                log.info("Failed to process stream task {} since it got migrated to another
thread already. " +
+                        "Closing it as zombie before triggering a new rebalance.", task.id());
                 final RuntimeException fatalException = closeZombieTask(task);
                 if (fatalException != null) {
                     throw fatalException;
@@ -125,6 +127,8 @@ int punctuate() {
                     punctuated++;
                 }
             } catch (final TaskMigratedException e) {
+                log.info("Failed to punctuate stream task {} since it got migrated to another
thread already. " +
+                        "Closing it as zombie before triggering a new rebalance.", task.id());
                 final RuntimeException fatalException = closeZombieTask(task);
                 if (fatalException != null) {
                     throw fatalException;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 2cd82f461dd..8529c9eca88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -200,6 +200,8 @@ private RuntimeException suspendTasks(final Collection<T> tasks)
{
                 suspended.put(task.id(), task);
             } catch (final TaskMigratedException closeAsZombieAndSwallow) {
                 // as we suspend a task, we are either shutting down or rebalancing, thus,
we swallow and move on
+                log.info("Failed to suspend {} {} since it got migrated to another thread
already. " +
+                        "Closing it as zombie and move on.", taskTypeName, task.id());
                 firstException.compareAndSet(null, closeZombieTask(task));
                 it.remove();
             } catch (final RuntimeException e) {
@@ -216,7 +218,6 @@ private RuntimeException suspendTasks(final Collection<T> tasks)
{
     }
 
     RuntimeException closeZombieTask(final T task) {
-        log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName,
task.id());
         try {
             task.close(false, true);
         } catch (final RuntimeException e) {
@@ -242,11 +243,12 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition>
                 try {
                     task.resume();
                 } catch (final TaskMigratedException e) {
+                    log.info("Failed to resume {} {} since it got migrated to another thread
already. " +
+                            "Closing it as zombie before triggering a new rebalance.", taskTypeName,
task.id());
                     final RuntimeException fatalException = closeZombieTask(task);
                     if (fatalException != null) {
                         throw fatalException;
                     }
-                    suspended.remove(taskId);
                     throw e;
                 }
                 transitionToRunning(task, new HashSet<TopicPartition>());
@@ -368,14 +370,14 @@ void applyToRunningTasks(final TaskAction<T> action) {
             try {
                 action.apply(task);
             } catch (final TaskMigratedException e) {
+                log.info("Failed to commit {} {} since it got migrated to another thread
already. " +
+                        "Closing it as zombie before triggering a new rebalance.", taskTypeName,
task.id());
                 final RuntimeException fatalException = closeZombieTask(task);
                 if (fatalException != null) {
                     throw fatalException;
                 }
                 it.remove();
-                if (firstException == null) {
-                    firstException = e;
-                }
+                throw e;
             } catch (final RuntimeException t) {
                 log.error("Failed to {} {} {} due to the following error:",
                           action.name(),
@@ -416,6 +418,8 @@ void close(final boolean clean) {
             try {
                 task.close(clean, false);
             } catch (final TaskMigratedException e) {
+                log.info("Failed to close {} {} since it got migrated to another thread already.
" +
+                        "Closing it as zombie and move on.", taskTypeName, task.id());
                 firstException.compareAndSet(null, closeZombieTask(task));
             } catch (final RuntimeException t) {
                 log.error("Failed while closing {} {} due to the following error:",
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 39d34d799d2..861556cded3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -144,6 +144,8 @@ public void close(final boolean clean,
         } finally {
             closeStateManager(committedSuccessfully);
         }
+
+        taskClosed = true;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6bca02ad9bc..b8777ad5521 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -541,6 +541,8 @@ public void close(boolean clean,
         }
 
         closeSuspended(clean, isZombie, firstException);
+
+        taskClosed = true;
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 61a22be5a15..cda04e9efc0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -757,7 +757,12 @@ private void runLoop() {
             } catch (final TaskMigratedException ignoreAndRejoinGroup) {
                 log.warn("Detected task {} that got migrated to another thread. " +
                     "This implies that this thread missed a rebalance and dropped out of
the consumer group. " +
-                    "Trying to rejoin the consumer group now. Below is the detailed description
of the task:\n{}", ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">"));
+                    "Will try to rejoin the consumer group. Below is the detailed description
of the task:\n{}",
+                        ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">"));
+
+                // re-subscribe to enforce a rebalance in the next poll call
+                consumer.unsubscribe();
+                consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
             }
         }
     }
@@ -898,6 +903,13 @@ private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]>
records) {
 
         for (final TopicPartition partition : records.partitions()) {
             final StreamTask task = taskManager.activeTask(partition);
+
+            if (task.isClosed()) {
+                log.warn("Stream task {} is already closed, probably because it got unexpectly
migrated to another thread already. " +
+                        "Notifying the thread to trigger a new rebalance immediately.", task.id());
+                throw new TaskMigratedException(task);
+            }
+
             numAddedRecords += task.addRecords(partition, records.records(partition));
         }
         streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
@@ -1024,6 +1036,13 @@ private void maybeUpdateStandbyTasks(final long now) {
                         List<ConsumerRecord<byte[], byte[]>> remaining = entry.getValue();
                         if (remaining != null) {
                             final StandbyTask task = taskManager.standbyTask(partition);
+
+                            if (task.isClosed()) {
+                                log.warn("Standby task {} is already closed, probably because
it got unexpectly migrated to another thread already. " +
+                                        "Notifying the thread to trigger a new rebalance
immediately.", task.id());
+                                throw new TaskMigratedException(task);
+                            }
+
                             remaining = task.update(partition, remaining);
                             if (remaining != null) {
                                 remainingStandbyRecords.put(partition, remaining);
@@ -1051,6 +1070,12 @@ private void maybeUpdateStandbyTasks(final long now) {
                             throw new StreamsException(logPrefix + "Missing standby task
for partition " + partition);
                         }
 
+                        if (task.isClosed()) {
+                            log.warn("Standby task {} is already closed, probably because
it got unexpectly migrated to another thread already. " +
+                                    "Notifying the thread to trigger a new rebalance immediately.",
task.id());
+                            throw new TaskMigratedException(task);
+                        }
+
                         final List<ConsumerRecord<byte[], byte[]>> remaining
= task.update(partition, records.records(partition));
                         if (remaining != null) {
                             restoreConsumer.pause(singleton(partition));
@@ -1063,6 +1088,13 @@ private void maybeUpdateStandbyTasks(final long now) {
                 final Set<TopicPartition> partitions = recoverableException.partitions();
                 for (final TopicPartition partition : partitions) {
                     final StandbyTask task = taskManager.standbyTask(partition);
+
+                    if (task.isClosed()) {
+                        log.warn("Standby task {} is already closed, probably because it
got unexpectly migrated to another thread already. " +
+                                "Notifying the thread to trigger a new rebalance immediately.",
task.id());
+                        throw new TaskMigratedException(task);
+                    }
+
                     log.info("Reinitializing StandbyTask {}", task);
                     task.reinitializeStateStoresForPartitions(recoverableException.partitions());
                 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 62ddacfb33c..9f02834dd75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -300,7 +300,6 @@ StreamTask activeTask(final TopicPartition partition) {
         return active.runningTaskFor(partition);
     }
 
-
     StandbyTask standbyTask(final TopicPartition partition) {
         return standby.runningTaskFor(partition);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 6c7b2b43c9d..c4ea9647a8d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -61,6 +61,7 @@
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -78,18 +79,18 @@
     });
 
     private static String applicationId;
+    private final static int NUM_TOPIC_PARTITIONS = 2;
     private final static String CONSUMER_GROUP_ID = "readCommitted";
     private final static String SINGLE_PARTITION_INPUT_TOPIC = "singlePartitionInputTopic";
     private final static String SINGLE_PARTITION_THROUGH_TOPIC = "singlePartitionThroughTopic";
     private final static String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic";
-    private final static int NUM_TOPIC_PARTITIONS = 2;
     private final static String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
     private final static String MULTI_PARTITION_THROUGH_TOPIC = "multiPartitionThroughTopic";
     private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
     private final String storeName = "store";
 
     private AtomicBoolean errorInjected;
-    private AtomicBoolean injectGC;
+    private AtomicBoolean gcInjected;
     private volatile boolean doGC = true;
     private AtomicInteger commitRequested;
     private Throwable uncaughtException;
@@ -153,7 +154,6 @@ private void runSimpleCopyTest(final int numberOfRestarts,
         output.to(outputTopic);
 
         for (int i = 0; i < numberOfRestarts; ++i) {
-            final long factor = i;
             final KafkaStreams streams = new KafkaStreams(
                 builder.build(),
                 StreamsTestUtils.getStreamsConfig(
@@ -171,7 +171,7 @@ private void runSimpleCopyTest(final int numberOfRestarts,
             try {
                 streams.start();
 
-                final List<KeyValue<Long, Long>> inputData = prepareData(factor
* 100, factor * 100 + 10L, 0L, 1L);
+                final List<KeyValue<Long, Long>> inputData = prepareData(i *
100, i * 100 + 10L, 0L, 1L);
 
                 IntegrationTestUtils.produceKeyValuesSynchronously(
                     inputTopic,
@@ -510,7 +510,7 @@ public boolean conditionMet() {
             checkResultPerKey(committedRecords, committedDataBeforeGC);
             checkResultPerKey(uncommittedRecords, dataBeforeGC);
 
-            injectGC.set(true);
+            gcInjected.set(true);
             writeInputData(dataToTriggerFirstRebalance);
 
             TestUtils.waitForCondition(new TestCondition() {
@@ -577,7 +577,7 @@ public boolean conditionMet() {
     private KafkaStreams getKafkaStreams(final boolean withState, final String appDir, final
int numberOfStreamsThreads) {
         commitRequested = new AtomicInteger(0);
         errorInjected = new AtomicBoolean(false);
-        injectGC = new AtomicBoolean(false);
+        gcInjected = new AtomicBoolean(false);
         final StreamsBuilder builder = new StreamsBuilder();
 
         String[] storeNames = null;
@@ -614,7 +614,7 @@ public void init(final ProcessorContext context) {
                             // only tries to fail once on one of the task
                             throw new RuntimeException("Injected test exception.");
                         }
-                        if (injectGC.compareAndSet(true, false)) {
+                        if (gcInjected.compareAndSet(true, false)) {
                             while (doGC) {
                                 try {
                                     Thread.sleep(100);
@@ -779,6 +779,8 @@ private void verifyStateStore(final KafkaStreams streams, final Set<KeyValue<Lon
             }
         }
 
+        assertNotNull(store);
+
         final KeyValueIterator<Long, Long> it = store.all();
         while (it.hasNext()) {
             assertTrue(expectedStoreContent.remove(it.next()));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Consumer.poll may not trigger rebalance in time when there is a task migration
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-6534
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6534
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>
> When Streams detect a task migration event in one of its thread, today it will always
let its trigger to call {{consumer.poll}} hoping it could trigger the rebalance and hence
clean up the records buffered from the partitions that on longer owned. However, because the
rebalance is based onĀ heartbeat responses which has a window of race, the rebalance is not
always guaranteed to be triggered when task migration happens. As a result it could cause
the records buffered in consumer to not be cleaned up and later be processed by Streams, realizing
it no longer belongs to the thread, causing:
> {code:java}
> java.lang.IllegalStateException: Record's partition does not belong to this partition-group.
> {code}
> Note this issue is only relevant when EOS is turned on, and based the default heartbeat.interval.ms
value (3 sec), the race likelihood should not be high.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message