camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael J. Kitchin (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CAMEL-8975) Message loss with batch commit
Date Thu, 16 Jul 2015 00:41:04 GMT

     [ https://issues.apache.org/jira/browse/CAMEL-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Michael J. Kitchin updated CAMEL-8975:
--------------------------------------
    Description: 
These issues center around Kafka consumer (KafaConsumer.java, line numbers below):
# Exchange exceptions/failures ignored at process() (:148), meaning:
## Automatic offset commit on exchange failure (e.g., processor/endpoint exception)
## In-flight exchange loss on Camel context/runtime shutdown (i.e., route interrupted ->
exception suppressed -> offset committed)
# BatchCommitConsumerTask activations are unbalanced during periods of low activity, meaning:
## await() (:165) will timeout for active BatchCommitConsumerTask(s) when other consumer threads
are binding on it.hasNext() (:145) (blocking call, despite no @throws)
## Any, previously-activated await()'ing thread will (a) get a TimeoutExeception, (b) loop,
and (c) get a BrokenBarrierException on the next await() call and (d) exit
## Process will repeat until (a) all consumer stream threads have exited, (b) leaving consumer
dead
## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the route)
# An ExecutorService is obtained from Camel to handle KafkaStreams with # of threads set to
the consumerStreams param (:77). Since the # of KafkaStreams actually created is (consumersCount
* consumerStreams) and executor runnables are indefinite loops, a random selection of streams
will not be serviced if consumersCount>1.

Source code URL:
- https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java

We've troubleshot this extensively and reimplemented the KafkaConsumer class with params added
to KafkaConfiguration to address these concerns and are happy to submit these back to the
community, if interested.

  was:
These issues center around Kafka consumer (KafaConsumer.java, line numberrs below):
# Exchange exceptions/failures ignored at process() (:148), meaning:
## Automatic offset commit on exchange failure (e.g., processor/endpoint exception)
## In-flight exchange loss on Camel context/runtime shutdown (i.e., route interrupted ->
exception suppressed -> offset committed)
# BatchCommitConsumerTask activations are unbalanced during periods of low activity, meaning:
## await() (:165) will timeout for active BatchCommitConsumerTask(s) when other consumer threads
are binding on it.hasNext() (:145) (blocking call, despite no @throws)
## Any, previously-activated await()'ing thread will (a) get a TimeoutExeception, (b) loop,
and (c) get a BrokenBarrierException on the next await() call and (d) exit
## Process will repeat until (a) all consumer stream threads have exited, (b) leaving consumer
dead
## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the route)
# An ExecutorService is obtained from Camel to handle KafkaStreams with # of threads set to
the consumerStreams param (:77). Since the # of KafkaStreams actually created is (consumersCount
* consumerStreams) and executor runnables are indefinite loops, a random selection of streams
will not be serviced if consumersCount>1.

Source code URL:
- https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java

We've troubleshot this extensively and reimplemented the KafkaConsumer class with params added
to KafkaConfiguration to address these concerns and are happy to submit these back to the
community, if interested.


> Message loss with batch commit
> ------------------------------
>
>                 Key: CAMEL-8975
>                 URL: https://issues.apache.org/jira/browse/CAMEL-8975
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.15.2
>         Environment: Unbuntu LTS 14.x, Java 7
>            Reporter: Michael J. Kitchin
>
> These issues center around Kafka consumer (KafaConsumer.java, line numbers below):
> # Exchange exceptions/failures ignored at process() (:148), meaning:
> ## Automatic offset commit on exchange failure (e.g., processor/endpoint exception)
> ## In-flight exchange loss on Camel context/runtime shutdown (i.e., route interrupted
-> exception suppressed -> offset committed)
> # BatchCommitConsumerTask activations are unbalanced during periods of low activity,
meaning:
> ## await() (:165) will timeout for active BatchCommitConsumerTask(s) when other consumer
threads are binding on it.hasNext() (:145) (blocking call, despite no @throws)
> ## Any, previously-activated await()'ing thread will (a) get a TimeoutExeception, (b)
loop, and (c) get a BrokenBarrierException on the next await() call and (d) exit
> ## Process will repeat until (a) all consumer stream threads have exited, (b) leaving
consumer dead
> ## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the route)
> # An ExecutorService is obtained from Camel to handle KafkaStreams with # of threads
set to the consumerStreams param (:77). Since the # of KafkaStreams actually created is (consumersCount
* consumerStreams) and executor runnables are indefinite loops, a random selection of streams
will not be serviced if consumersCount>1.
> Source code URL:
> - https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
> We've troubleshot this extensively and reimplemented the KafkaConsumer class with params
added to KafkaConfiguration to address these concerns and are happy to submit these back to
the community, if interested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message