camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Claus Ibsen (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CAMEL-8975) camel-kafka - Message loss with batch commit
Date Wed, 29 Jul 2015 12:26:05 GMT

     [ https://issues.apache.org/jira/browse/CAMEL-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Claus Ibsen updated CAMEL-8975:
-------------------------------
    Summary: camel-kafka - Message loss with batch commit  (was: Message loss with batch commit)

> camel-kafka - Message loss with batch commit
> --------------------------------------------
>
>                 Key: CAMEL-8975
>                 URL: https://issues.apache.org/jira/browse/CAMEL-8975
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.15.2
>         Environment: Unbuntu LTS 14.x, Java 7
>            Reporter: Michael J. Kitchin
>
> These issues center around Kafka consumer (KafaConsumer.java, line numbers below):
> # Exchange exceptions/failures ignored at process() (:148), meaning:
> ## Automatic offset commit on exchange failure (e.g., processor/endpoint exception)
> ## In-flight exchange loss on Camel context/runtime shutdown (i.e., route interrupted
-> exception suppressed -> offset committed)
> # BatchCommitConsumerTask activations are unbalanced during periods of low activity,
meaning:
> ## await() (:165) will timeout for active BatchCommitConsumerTask(s) when other consumer
threads are binding on it.hasNext() (:145) (blocking call, despite no @throws)
> ## Any, previously-activated await()'ing thread will (a) get a TimeoutExeception, (b)
loop, and (c) get a BrokenBarrierException on the next await() call and (d) exit
> ## Process will repeat until (a) all consumer stream threads have exited, (b) leaving
consumer dead
> ## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the route)
> # An ExecutorService is obtained from Camel to handle KafkaStreams with # of threads
set to the consumerStreams param (:77). Since the # of KafkaStreams actually created is (consumersCount
* consumerStreams) and executor runnables are indefinite loops, a random selection of streams
will not be serviced if consumersCount>1.
> Source code URL:
> - https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
> We've troubleshot this extensively and reimplemented the KafkaConsumer class with params
added to KafkaConfiguration to address these concerns and are happy to submit these back to
the community, if interested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message