camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Roger (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CAMEL-11215) Camel Kafka component commits offsets in case of exceptions
Date Tue, 16 May 2017 20:21:04 GMT

    [ https://issues.apache.org/jira/browse/CAMEL-11215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16013048#comment-16013048
] 

Roger commented on CAMEL-11215:
-------------------------------

Thanks Claus!
We will checkout that branch and test it.
When will this be available on the release? 


> Camel Kafka component commits offsets in case of exceptions
> -----------------------------------------------------------
>
>                 Key: CAMEL-11215
>                 URL: https://issues.apache.org/jira/browse/CAMEL-11215
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.18.3
>            Reporter: Roger
>            Assignee: Claus Ibsen
>             Fix For: 2.18.4, 2.19.1, 2.20.0
>
>
> My processor in the router throws an exception but the Kafka component still commits
the offsets. 
> My route: (heavily redacted and modified)
> {code:title=Route|borderStyle=solid}
> from( "kafka://blah-blah" ).routeId("MyRoute")
>                 .convertBodyTo( MyData.class )
>                 .process( "MyProcessor" )
>                 .to( "DestinationProcessor" );
> {code}
> The exception I get: 
> {code:title=Exception|borderStyle=solid}
>         at com.mycompany.MyProcessor.process(MyProcessor.java:152)
>         at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103)
>         at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
>         at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
>         at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
>         at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>         at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
>         at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
>         at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
>         at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>         at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
>         at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
>         at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
>         at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
>         at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
>         at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>         at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:140)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.sql.SQLException: Exception occurred while getting connection: oracle.ucp.UniversalConnectionPoolException:
Cannot get Connection from Datasource: java.sql.SQLException: Listener refused the connection
with the following error:
> ORA-12514, TNS:listener does not currently know of service requested in connect descriptor
> {code}
> Here is the corresponding Kafka component code:(KafkaConsumer.java) -This part of the
code does not seem to handle the exception. The exception handler simply eats up the exception
and the fall through code happily commits the offsets. Is this a bug? or am I missing something?
> {code:title=KafkaConsumer.java|borderStyle=solid}
> while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended())
{
>                     ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);
>                     for (TopicPartition partition : allRecords.partitions()) {
>                         List<ConsumerRecord<Object, Object>> partitionRecords
= allRecords
>                             .records(partition);
>                         for (ConsumerRecord<Object, Object> record : partitionRecords)
{
>                             if (LOG.isTraceEnabled()) {
>                                 LOG.trace("partition = {}, offset = {}, key = {}, value
= {}", record.partition(), record.offset(), record.key(), record.value());
>                             }
>                             Exchange exchange = endpoint.createKafkaExchange(record);
>                             try {
>                                 processor.process(exchange);
>                             } catch (Exception e) {
>                                 getExceptionHandler().handleException("Error during processing",
exchange, e);
>                             }
>                         }
>                         // if autocommit is false
>                         if (endpoint.getConfiguration().isAutoCommitEnable() != null
>                             && !endpoint.getConfiguration().isAutoCommitEnable())
{
>                             long partitionLastoffset = partitionRecords.get(partitionRecords.size()
- 1).offset();
>                             consumer.commitSync(Collections.singletonMap(
>                                 partition, new OffsetAndMetadata(partitionLastoffset
+ 1)));
>                         }
>                     }
>                 }
> {code}
> Any insights are appreciated.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message