flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Foo Lim <foo....@vungle.com>
Subject Re: 1.6 release date?
Date Tue, 30 Dec 2014 02:37:10 GMT
Hi Frank,

Thanks for the reply. I have other topics in kafka that are working:

~/kafka_2.9.2-0.8.1.1$ bin/kafka-topics.sh --zookeeper localhost --describe
Topic:requests PartitionCount:1 ReplicationFactor:1 Configs:
Topic: requests Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
test
tst1

$ bin/kafka-console-consumer.sh --zookeeper localhost --topic test
--from-beginning
test
tst1

Am I supposed to create the 'custom-topic' with 2 partitions before
running 'mvn clean install'?



Let's try creating the 'custom-topic':

$ bin/kafka-topics.sh --create --partitions 2 --topic custom-topic
--zookeeper localhost --replication-factor 1
Created topic "custom-topic".

$ bin/kafka-topics.sh --zookeeper localhost --describe --topic custom-topic
Topic:custom-topic PartitionCount:2 ReplicationFactor:1 Configs:
Topic: custom-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: custom-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0


$ cd ~/flume-ng-kafka-sink

$ mvn clean install
[...]
[2014-12-30 02:34:38,520] INFO Fetching metadata from broker
id:0,host:vagrant-ubuntu-precise-64,port:51064 with correlation id 26
for 1 topic(s) Set(custom-topic) (kafka.client.ClientUtils$)
[2014-12-30 02:34:38,521] INFO Connected to
vagrant-ubuntu-precise-64:51064 for producing
(kafka.producer.SyncProducer)
[2014-12-30 02:34:38,525] INFO Disconnecting from
vagrant-ubuntu-precise-64:51064 (kafka.producer.SyncProducer)
[2014-12-30 02:34:38,526] INFO Closing socket connection to
/10.0.2.15. (kafka.network.Processor)
[2014-12-30 02:34:38,528] WARN [KafkaApi-0] Offset request with
correlation id 0 from client
group_1-ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419906871686-f7db9c60-0-0
on partition [custom-topic,1] failed due to Leader not local for
partition [custom-topic,1] on broker 0 (kafka.server.KafkaApis)
[2014-12-30 02:34:38,530] WARN
[group_1_vagrant-ubuntu-precise-64-1419906871686-f7db9c60-leader-finder-thread],
Failed to add leader for partitions [custom-topic,1],[custom-topic,0];
will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.NotLeaderForPartitionException
at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160)
at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179)
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174)
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-12-30 02:34:38,536] INFO
[ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419906871686-f7db9c60-0-0],
Shutting down (kafka.consumer.ConsumerFetcherThread)
[2014-12-30 02:34:38,528] INFO
[ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419906871686-f7db9c60-0-0],
Starting  (kafka.consumer.ConsumerFetcherThread)
[2014-12-30 02:34:38,539] INFO
[ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419906871686-f7db9c60-0-0],
Shutdown completed (kafka.consumer.ConsumerFetcherThread)


On Mon, Dec 29, 2014 at 3:53 PM, Frank Yao <baniu.yao@gmail.com> wrote:
> hi foo
>
> it seems your stack showed exception was caused by kafka itself
>
> Failed to add leader for partitions
>
>
> I have used kafka sink and source of flume 1.6 for several weeks and it
> works well.
>
> Could you please try to use kafka console producer first to test if the
> partitionis okay or not?
> Frank Yao
> @Vipshop, Shanghai
> from iPhone
>
> 在 2014年12月30日,04:21,Foo Lim <foo.lim@vungle.com> 写道:
>
> BTW, I followed the directions & ran
>
> ~/flume-ng-kafka-sink$ mvn clean install
>
> On Mon, Dec 29, 2014 at 12:10 PM, Foo Lim <foo.lim@vungle.com> wrote:
>
> Hi Gwen,
>
>
> Thanks for the reply.
>
>
> I'll try the CDH jar file. Where do I put it in the flume directory
> structure?
>
>
> I have kafka_2.9.2-0.8.1.1 running. Here's the test error (which keeps
>
> repeating) in the project
>
> git@github.com:thilinamb/flume-ng-kafka-sink.git
>
>
> [2014-12-29 20:02:34,028] INFO Verifying properties
>
> (kafka.utils.VerifiableProperties)
>
> [2014-12-29 20:02:34,029] INFO Property client.id is overridden to
>
> group_1 (kafka.utils.VerifiableProperties)
>
> [2014-12-29 20:02:34,030] INFO Property metadata.broker.list is
>
> overridden to vagrant-ubuntu-precise-64:50753
>
> (kafka.utils.VerifiableProperties)
>
> [2014-12-29 20:02:34,030] INFO Property request.timeout.ms is
>
> overridden to 30000 (kafka.utils.VerifiableProperties)
>
> [2014-12-29 20:02:34,031] INFO Fetching metadata from broker
>
> id:0,host:vagrant-ubuntu-precise-64,port:50753 with correlation id 18
>
> for 1 topic(s) Set(custom-topic) (kafka.client.ClientUtils$)
>
> [2014-12-29 20:02:34,032] INFO Connected to
>
> vagrant-ubuntu-precise-64:50753 for producing
>
> (kafka.producer.SyncProducer)
>
> [2014-12-29 20:02:34,035] INFO Disconnecting from
>
> vagrant-ubuntu-precise-64:50753 (kafka.producer.SyncProducer)
>
> [2014-12-29 20:02:34,036] INFO Closing socket connection to
>
> /10.0.2.15. (kafka.network.Processor)
>
> [2014-12-29 20:02:34,038] WARN [KafkaApi-0] Offset request with
>
> correlation id 0 from client
>
> group_1-ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0
>
> on partition [custom-topic,1] failed due to Leader not local for
>
> partition [custom-topic,1] on broker 0 (kafka.server.KafkaApis)
>
> [2014-12-29 20:02:34,040] WARN
>
> [group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-leader-finder-thread],
>
> Failed to add leader for partitions [custom-topic,1],[custom-topic,0];
>
> will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
>
> kafka.common.NotLeaderForPartitionException
>
> at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source)
>
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>
> at java.lang.Class.newInstance(Class.java:379)
>
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
>
> at
> kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160)
>
> at
> kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
>
> at
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179)
>
> at
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174)
>
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>
> at
> kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174)
>
> at
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
>
> at
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
>
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>
> at
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
>
> at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> [2014-12-29 20:02:34,045] INFO
>
> [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0],
>
> Shutting down (kafka.consumer.ConsumerFetcherThread)
>
> [2014-12-29 20:02:34,039] INFO
>
> [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0],
>
> Starting  (kafka.consumer.ConsumerFetcherThread)
>
> [2014-12-29 20:02:34,046] INFO
>
> [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0],
>
> Shutdown completed (kafka.consumer.ConsumerFetcherThread)
>
> [2014-12-29 20:02:34,047] INFO Closing socket connection to
>
> /10.0.2.15. (kafka.network.Processor)
>
> [2014-12-29 20:02:34,048] INFO
>
> [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0],
>
> Stopped  (kafka.consumer.ConsumerFetcherThread)
>
>
> On Fri, Dec 26, 2014 at 4:06 PM, Gwen Shapira <gshapira@cloudera.com> wrote:
>
> I can't say when's the 1.6 release, but I have other solutions :)
>
>
> 1. The packages that are part of CDH5.3 release will contain that jar.
>
> Perhaps use this distro? Or even just get the RPM, unpackage and dig the jar
>
> out?
>
> 2. Let us know what's the compilation error, perhaps we can help there?
>
>
> On Fri, Dec 26, 2014 at 3:30 PM, Foo Lim <foo.lim@vungle.com> wrote:
>
>
> Hi all,
>
>
> Happy holidays! Just wondering if there's any ETA on a 1.6 release.
>
> Looking forward to the kafka sink plugin that I can't get to compile
>
> independently. :-/
>
>
> Thanks!
>
>
>

Mime
View raw message