camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sebastian Rühl (JIRA) <>
Subject [jira] [Commented] (CAMEL-9716) Camel KafkaConsumer should be able to re-consume the Kafka stream.
Date Mon, 21 Mar 2016 09:21:25 GMT


Sebastian Rühl commented on CAMEL-9716:

[~davsclaus] The problem is that the shutdown of the executor doesn't kill the thread properly.
I tried to debug last friday why exactly the thread can't be interrupted but couldn't find
it out yet. Because the thread gets never interrupted we never reached the (kafka)consumer.unsubscribe().
I suspected this was the reason we had such a long time waiting till we can resubscribe on
kafka. In normal conditions this isn't a problem beside on a camel-restart where you have
then a zombie thread (and unsubscribe gets never called). Because of the nature of the jUnit
test I have written Im dependent on a proper shutdown of the thread to perform a unsubscribe.
If you read the javadoc of this kafka method you can see that this method is meant to "interrupt"
a long running process. For me this whole looks like a swallowed Interrupted exception in
combination with a loop that prevent the thread from terminating. I need to isolate this issue
with an jUnit test and submit a kafka issue but till then the call of wakeup should help with
this issue.

I found this issue (wakeup etc.) while I was working on the jUnit test so this is the reason
I wrote a separate patch for it (Test requires prober shutdown and the issue this patch fixing
is independent on this issue). 

> Camel KafkaConsumer should be able to re-consume the Kafka stream.
> ------------------------------------------------------------------
>                 Key: CAMEL-9716
>                 URL:
>             Project: Camel
>          Issue Type: New Feature
>          Components: camel-kafka
>    Affects Versions: 2.17.0
>            Reporter: Sebastian Rühl
>            Priority: Minor
>             Fix For: 2.18.0
>         Attachments: _CAMEL_9716__call_wakeup_on_kafka_consumer_to_enforce_shutdown.patch
> In some corner cases we should be able to re-consume a Kafka stream from the beginning
on startup of the endpoint.
> For this the Kafka-KafkaConsumer has a method
> {code:java}
>    public void seekToBeginning(TopicPartition... partitions) {
> {code}
> {panel:title= Sidenode |borderStyle=dashed|borderColor=#ccc|titleBGColor=#F7D6C1|bgColor=#FFFFCE}
> There are other methods to manipulate offsets but for this ticket we just look on the
re-consume corner case.
> {panel}
> Currently I have implemented this support in a branch here ([])
and have also written an jUnit test.
> -At this time this test still fails for unknown reasons.-
> I would be keen on some feedback on this feature and also would like to advance this
branch to make it mergable. 

This message was sent by Atlassian JIRA

View raw message