flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gonzalo Herreros <gherre...@gmail.com>
Subject Re: KafkaSource not picking up any messages
Date Fri, 05 Feb 2016 07:15:52 GMT
I'm concerned with the warning "no brokers found when trying to rebalance"
Double check that the path in zookeeper is correct zk01:2181/mesos-kafka
and it's not the standard /kafka

When you connect with the kafka-console-consumer, do you specify
/mesos-kafka or just zk01:2181?
You can use the zkclient tool to check if there are brokers currently
registered under that path for the topic "test"

Regards,
Gonzalo


On 4 February 2016 at 21:16, Justin Ryan <juryan@ziprealty.com> wrote:

> Hiya folks,
>
> Iā€™m setting up a new environment with Kafka, Flume, and HDFS, and have
> implemented the simplest possible testing configuration I can come up
> with.  It logs successfully configuring and starting the KafkaSource, and
> with kafka tools I can confirm that messages have been sent, but the JSON
> Metrics from Flume show 0 messages processed.
>
> Are there any more tools at my disposal to investigate? Any assistance
> would be greatly appreciated!
>
> My config and log:
>
> ā€”
> # generated by Chef for mesos10, changes will be overwritten
>
> flume1.sources=kafka-source-test
> flume1.channels=hdfs-channel-kafka
> flume1.sinks=hdfs-sink-kafka
>
> flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSource
> flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
> flume1.sources.kafka-source-test.topic=test
> flume1.sources.kafka-source-test.groupId=flume
> flume1.sources.kafka-source-test.interceptors=i1
> flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
> flume1.sources.kafka-source-test.consumer.timeout.ms=100
> flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
> flume1.channels.hdfs-channel-kafka.type=memory
> flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
> flume1.sinks.hdfs-sink-kafka.type=hdfs
> flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
> flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
> flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
> flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
> flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
> flume1.channels.hdfs-channel-kafka.capacity=10
> flume1.channels.hdfs-channel-kafka.transactionCapacity=10
> ā€”
>
> Startup log (less incredibly long path lines):
> ā€”
> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
> Configuration provider starting
> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
> Reloading configuration file:/etc/flume/conf.chef/flume.conf
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks:
> hdfs-sink-kafka Agent: flume1
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume
> configuration contains configuration for agents: [flume1]
> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating
> channels
> 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of
> channel hdfs-channel-kafka type memory
> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel
> hdfs-channel-kafka
> 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of
> source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
> 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={
> parameters:{interceptors.i1.type=timestamp,
> zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka,
> groupId=flume, consumer.timeout.ms=100, topic=test,
> type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} }
> 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink:
> hdfs-sink-kafka, type: hdfs
> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel
> hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
> 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{
> sourceRunners:{kafka-source-test=PollableSourceRunner: {
> source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}
> counterGroup:{ name:null counters:{} } }}
> sinkRunners:{hdfs-sink-kafka=SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{
> name:null counters:{} } }}
> channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name:
> hdfs-channel-kafka}} }
> 16/02/04 11:32:07 INFO node.Application: Starting Channel
> hdfs-channel-kafka
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully
> registered new MBean.
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component
> type: CHANNEL, name: hdfs-channel-kafka started
> 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
> 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
> 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting
> org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}...
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SINK, name: hdfs-sink-kafka: Successfully
> registered new MBean.
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component
> type: SINK, name: hdfs-sink-kafka started
> 16/02/04 11:32:07 INFO mortbay.log: Logging to
> org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
> org.mortbay.log.Slf4jLog
> 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
> 16/02/04 11:32:07 INFO mortbay.log: Started
> SelectChannelConnector@0.0.0.0:34545
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
> auto.commit.enable is overridden to false
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
> consumer.timeout.ms is overridden to 10
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id is
> overridden to flume
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
> zookeeper.connect is overridden to zk01:2181/mesos-kafka
> 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at
> zk01:2181/mesos-kafka
> 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event
> thread.
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name
> =mesos10
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.version=1.8.0_72-internal
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.vendor=Oracle Corporation
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.io.tmpdir=/tmp
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.compiler=<NA>
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name
> =Linux
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:os.arch=amd64
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:os.version=3.13.0-63-generic
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name
> =marathon
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:user.home=/opt/marathon
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection,
> connectString=zk01:2181/mesos-kafka sessionTimeout=6000
> watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to
> server 10.100.6.251/10.100.6.251:2181. Will not attempt to authenticate
> using SASL (unknown error)
> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established
> to 10.100.6.251/10.100.6.251:2181, initiating session
> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment
> complete on server 10.100.6.251/10.100.6.251:2181, sessionid =
> 0x152858b1cc07491, negotiated timeout = 6000
> 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed
> (SyncConnected)
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], begin registering consumer
> flume_mesos10-1454614328204-ca8a74df in ZK
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], end registering consumer
> flume_mesos10-1454614328204-ca8a74df in ZK
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread
> for consumer flume_mesos10-1454614328204-ca8a74df
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer
> flume_mesos10-1454614328204-ca8a74df try #0
> 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to
> rebalance.
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer
> flume_mesos10-1454614328204-ca8a74df try #0
> 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test
> do started.
> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: kafka-source-test: Successfully
> registered new MBean.
> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component
> type: SOURCE, name: kafka-source-test started
> --
>
> --
> Justin Alan Ryan
> Sr. Systems / Release Engineer
> ZipRealty
>

Mime
View raw message