camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yacov Schondorf <yacov.schond...@gmail.com>
Subject Re: Camel: detect Kafka not available
Date Sun, 07 Jan 2018 11:09:56 GMT
Just for the record, I have tried adding "bridgeErrorHandler=true" to the
route . This does not help and Camel still tries to connect to the
non-existing Kafka. Final route looks like this:


from("kafka:{{consumer.topic}}?bridgeErrorHandler=true&brokers={{kafka.host}}:{{kafka.port}}"
                        + "&maxPollRecords={{consumer.maxPollRecords}}"
                        + "&consumersCount={{consumer.consumersCount}}"
                        + "&seekTo={{consumer.seekTo}}"
                        + "&groupId={{consumer.group}}")

I was expecting an exception to be thrown but this does not happen. This is
based on https://github.com/apache/camel/blob/master/examples/camel-e
xample-kafka/src/main/java/org/apache/camel/example/kafka/
MessageConsumerClient.java with just an additional
"bridgeErrorHandler=true" added to the route.


2017-12-03 14:54 GMT+02:00 Yacov Schondorf <yacov.schondorf@gmail.com>:

> No solution by Camel for detecting connection errors? I gave a very clear
> reproducible scenario...
>
> 2017-11-28 11:44 GMT+02:00 Yacov Schondorf <yacov.schondorf@gmail.com>:
>
>> But this is exactly my point - there is no stack trace! I want there to
>> be a stacktrace so that I could catch it using the regular error handler.
>> This is the purpose of the call to endpoint1.setBridgeErrorHandler(true);
>> However, the call does not work, no trace is printed and the polling
>> continues. Here is the complete code based on https://github.com/apache/c
>> amel/blob/master/examples/camel-example-kafka/src/main/java/
>> org/apache/camel/example/kafka/MessageConsumerClient.java with my
>> addition between the *// change start *and *// change end *blocks:
>>
>> package org.apache.camel.example.kafka;
>>
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.Endpoint;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.kafka.KafkaEndpoint;
>> import org.apache.camel.component.properties.PropertiesComponent;
>> import org.apache.camel.impl.DefaultCamelContext;
>> import org.apache.camel.impl.DefaultEndpoint;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.util.Collection;
>> import java.util.HashMap;
>> import java.util.Iterator;
>>
>> public final class MessageConsumerClient {
>>
>>     private static final Logger LOG = LoggerFactory.getLogger(Messag
>> eConsumerClient.class);
>>
>>     private MessageConsumerClient() {
>>     }
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         LOG.info("About to run Kafka-camel integration...");
>>
>>         CamelContext camelContext = new DefaultCamelContext();
>>
>>         // Add route to send messages to Kafka
>>
>>         camelContext.addRoutes(new RouteBuilder() {
>>             public void configure() {
>>                 PropertiesComponent pc = getContext().getComponent("properties",
>> PropertiesComponent.class);
>>                 pc.setLocation("classpath:application.properties");
>>
>>                 log.info("About to start route: Kafka Server -> Log ");
>>                 onException(Exception.class).process(new Processor() {
>>                     @Override
>>                     public void process(Exchange exchange) throws
>> Exception {
>>                         System.out.println("Exception occurred!!");
>>                     }
>>                 });
>>                 from("kafka:{{consumer.topic}}
>> ?brokers={{kafka.host}}:{{kafka.port}}"
>>                         + "&maxPollRecords={{consumer.maxPollRecords}}"
>>                         + "&consumersCount={{consumer.consumersCount}}"
>>                         + "&seekTo={{consumer.seekTo}}"
>>                         + "&groupId={{consumer.group}}")
>>                         .routeId("FromKafka")
>>                     .log("${body}");
>>             }
>>         });
>>
>>         camelContext.start();
>>
>>         //
>>         // change start
>>         //
>>         final Collection<Endpoint> endpoints =
>> camelContext.getEndpoints();
>>         for (Endpoint endpoint : endpoints) {
>>             if (endpoint instanceof DefaultEndpoint) {
>>                 final DefaultEndpoint endpoint1 = (DefaultEndpoint)
>> endpoint;
>>                 endpoint1.setBridgeErrorHandler(true);
>>                 final HashMap<String, Object> consumerProperties = new
>> HashMap<>();
>>                 consumerProperties.put("backoffMultiplier", 10);
>>                 consumerProperties.put("backoffErrorThreshold", 5);
>>                 endpoint1.setConsumerProperties(consumerProperties);
>>             }
>>         }
>>         //
>>         // change end here
>>         //
>>
>>         // let it run for 5 minutes before shutting down
>>         Thread.sleep(5 * 60 * 1000);
>>
>>         camelContext.stop();
>>     }
>>
>> }
>>
>>
>>
>>
>>
>> 2017-11-25 19:18 GMT+02:00 Claus Ibsen <claus.ibsen@gmail.com>:
>>
>>> Post the stactrace so we can see from where the error is thrown.
>>>
>>>
>>> On Wed, Nov 22, 2017 at 11:01 AM, Yacov Schondorf
>>> <yacov.schondorf@gmail.com> wrote:
>>> > I am trying to detect when Kafka is not available. I have modified the
>>> > example -
>>> >
>>> > https://github.com/apache/camel/blob/master/examples/camel-e
>>> xample-kafka/src/main/java/org/apache/camel/example/kafka/
>>> MessageConsumerClient.java
>>> > and
>>> > added following code right after camelContext.start()
>>> >
>>> >
>>> >
>>> >         final Collection<Endpoint> endpoints =
>>> camelContext.getEndpoints();
>>> >
>>> >         for (Endpoint endpoint : endpoints) {
>>> >
>>> >             if (endpoint instanceof DefaultEndpoint) {
>>> >
>>> >                 final DefaultEndpoint endpoint1 = (DefaultEndpoint)
>>> > endpoint;
>>> >
>>> >                 endpoint1.setBridgeErrorHandler(true);
>>> >
>>> >                 final HashMap<String, Object> consumerProperties =
new
>>> > HashMap<>();
>>> >
>>> >                 consumerProperties.put("backoffMultiplier", 10);
>>> >
>>> >                 consumerProperties.put("backoffErrorThreshold", 5);
>>> >
>>> >                 endpoint1.setConsumerProperties(consumerProperties);
>>> >
>>> >             }
>>> >
>>> >         }
>>> >
>>> >
>>> >
>>> > I ran the main() and hoped to see the consumer stopping the attempts to
>>> > connect to Kafka after 5 tries, but this did not work. I keep getting
>>> > output messages of “Connection to node -1 could not be established.
>>> Broker
>>> > may not be available.”
>>> >
>>> > Is this the right way to go? What am I doing wrong?
>>> >
>>> >
>>> > Thanks.
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> -----------------
>>> http://davsclaus.com @davsclaus
>>> Camel in Action 2: https://www.manning.com/ibsen2
>>>
>>
>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message