[ https://issues.apache.org/jira/browse/KAFKA-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16182785#comment-16182785
]
Randall Hauch commented on KAFKA-4632:
--------------------------------------
Actually, it looks like at least as far back as 0.10.0.1 the task is indeed [handling a WakeupException|https://github.com/apache/kafka/blob/0.10.1.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L248].
You seem to have reported multiple versions, but some of those appear to already been fixed.
Which one are you using?
> Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException
> ---------------------------------------------------------------------------
>
> Key: KAFKA-4632
> URL: https://issues.apache.org/jira/browse/KAFKA-4632
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0
> Reporter: Scott Reynolds
>
> WorkerSinkTask's closePartitions method isn't handling WakeupException that can be thrown
from commitSync.
> {code}
> org.apache.kafka.common.errors.WakeupException
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup
(ConsumerNetworkClient.java:404)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:245)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:180)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
(ConsumerCoordinator.java:499)
> at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync (KafkaConsumer.java:1104)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync (WorkerSinkTask.java:245)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit (WorkerSinkTask.java:264)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets (WorkerSinkTask.java:305)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions (WorkerSinkTask.java:435)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:147)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140)
> at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175)
> at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
> at java.util.concurrent.FutureTask.run (FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
> at java.lang.Thread.run (Thread.java:745)
> {code}
> I believe it should catch it and ignore it as that is what the poll method does when
isStopping is true
> {code:java}
> } catch (WakeupException we) {
> log.trace("{} consumer woken up", id);
> if (isStopping())
> return;
> if (shouldPause()) {
> pauseAll();
> } else if (!pausedForRedelivery) {
> resumeAll();
> }
> }
> {code}
> But unsure, love some insight into this.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
|