kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Randall Hauch (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4632) Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException
Date Wed, 27 Sep 2017 16:02:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16182785#comment-16182785
] 

Randall Hauch commented on KAFKA-4632:
--------------------------------------

Actually, it looks like at least as far back as 0.10.0.1 the task is indeed [handling a WakeupException|https://github.com/apache/kafka/blob/0.10.1.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L248].
You seem to have reported multiple versions, but some of those appear to already been fixed.
Which one are you using?

> Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-4632
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4632
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0
>            Reporter: Scott Reynolds
>
> WorkerSinkTask's closePartitions method isn't handling WakeupException that can be thrown
from commitSync.
> {code}
> org.apache.kafka.common.errors.WakeupException
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup
(ConsumerNetworkClient.java:404)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:245)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:180)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
(ConsumerCoordinator.java:499)
> at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync (KafkaConsumer.java:1104)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync (WorkerSinkTask.java:245)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit (WorkerSinkTask.java:264)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets (WorkerSinkTask.java:305)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions (WorkerSinkTask.java:435)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:147)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140)
> at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175)
> at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
> at java.util.concurrent.FutureTask.run (FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
> at java.lang.Thread.run (Thread.java:745)
> {code}
> I believe it should catch it and ignore it as that is what the poll method does when
isStopping is true
> {code:java}
>         } catch (WakeupException we) {
>             log.trace("{} consumer woken up", id);
>             if (isStopping())
>                 return;
>             if (shouldPause()) {
>                 pauseAll();
>             } else if (!pausedForRedelivery) {
>                 resumeAll();
>             }
>         }
> {code}
> But unsure, love some insight into this.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message