flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hari Shreedharan <hshreedha...@cloudera.com>
Subject Re: Kafka sink deleted, brokerList null
Date Wed, 25 Mar 2015 19:15:53 GMT
Set this param: a1.sinks.k1.brokerList = <list of brokers>
instead of a1.sinks.k1.kafka.metadata.broker.list =
localhost:9091,localhost:9092


Thanks,
Hari

On Wed, Mar 25, 2015 at 12:02 PM, Adam Tannir <atannir@gmail.com> wrote:

> Hello,
>
> When running flume with kafka as a sink, an error is logged that
> "brokerList must contain at least one Kafka broker" but the line
> immediately previous shows the host:port entries as were entered in the
> config file and stored in the context.
>
> Everything works when I hardcode the host:port into the brokerList string
> and skip the failing test but that is a suboptimal solution. The kafka
> instances are from their quickstart guide and have no issues.
>
> Why isn't the value being selected from the context?
>
>
> flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java:
>
>   private static void addDocumentedKafkaProps(Context context,
>                                               Properties kafkaProps)
>           throws ConfigurationException {
>     String brokerList = context.getString(KafkaSinkConstants
>             .BROKER_LIST_FLUME_KEY);
>     if (brokerList == null) {
>       throw new ConfigurationException("brokerList must contain at least "
> +
>               "one Kafka broker");
>     }
>     kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);
>
>     String requiredKey = context.getString(
>             KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
>
>     if (requiredKey != null ) {
>       kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
>     }
>   }
>
>
>
> Config:
>
> # Describe the sink
> a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
> a1.sinks.k1.kafka.metadata.broker.list = localhost:9091,localhost:9092
> a1.sinks.k1.kafka.zookeeper.connect = localhost:2181
> a1.sinks.k1.topic = test
>
> logs/flume.log
>
> 25 Mar 2015 14:28:52,598 INFO  [conf-file-poller-0]
> (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)  -
> context={ parameters:{topic=test,
> kafka.metadata.broker.list=localhost:9091,localhost:9092,
> kafka.zookeeper.connect=localhost:2181,
> type=org.apache.flume.sink.kafka.KafkaSink, channel=c1} }
> 25 Mar 2015 14:28:52,611 ERROR [conf-file-poller-0]
> (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:427)  - Sink
> k1 has been removed due to an error during configuration
> org.apache.flume.conf.ConfigurationException: brokerList must contain at
> least one Kafka broker
>         at
> org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
>         at
> org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
>         at
> org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)
>         at
> org.apache.flume.conf.Configurables.configure(Configurables.java:41)
>         at
> org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
>         at
> org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
>         at
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> git clone https://github.com/apache/flume.git
> mvn compile install -DskipTests
> Version 1.6.0-SNAPSHOT from today
>
> Thanks!
>

Mime
View raw message