gearpump-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "舒琦"<sh...@eefung.com>
Subject Re: Questions About Kafka Source
Date Wed, 04 May 2016 10:43:43 GMT
Hi Manu,


One more thing, the app always show blowing:




————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609


原始邮件
发件人:舒琦shuqi@eefung.com
收件人:useruser@gearpump.incubator.apache.org
发送时间:2016年5月4日(周三) 18:41
主题:Re: Questions About Kafka Source


Hi Manu,


The topic in our Kakfa cluster has 8 partitions, since set property parallelism of Kafka source
to 8,
I found that there were messages flowing in kafka2kafka app, and can consume messages from
sink topic, but it lagged a lot.


I wonder if it caused by polling from partition round-robin?


Thanks.



————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609


原始邮件
发件人:舒琦shuqi@eefung.com
收件人:useruser@gearpump.incubator.apache.org
发送时间:2016年5月4日(周三) 15:10
主题:Re: Questions About Kafka Source


Hi Manu,


What’s Kafka version in your test? And is Kafka Source in gearpump compatible with Kafka
version 0.8.2?


Thanks.


————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609


原始邮件
发件人:舒琦shuqi@eefung.com
收件人:useruser@gearpump.incubator.apache.org
发送时间:2016年5月4日(周三) 14:37
主题:Re: Questions About Kafka Source


Hi Manu,




Thanks.


————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609


原始邮件
发件人:Manu Zhangowenzhang1990@gmail.com
收件人:useruser@gearpump.incubator.apache.org
发送时间:2016年5月4日(周三) 14:29
主题:Re: Questions About Kafka Source


Hi Qi,


Could you also paste information about your kafka topic ? For example, the output of "bin/kafka-topics
--list --zookeeper zk11:3181,zk12:3181"




On Wed, May 4, 2016 at 2:26 PM 舒琦 shuqi@eefung.com wrote:

Hi Manu,


I modify the kaka configuration already, the code as showed below:




I know there must be something wrong in my side, but I don’t have any idea now.


Do you have any other suggestions, thanks.




————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609


原始邮件
发件人:Manu Zhangowenzhang1990@gmail.com
收件人:useruser@gearpump.incubator.apache.org
发送时间:2016年5月4日(周三) 14:21
主题:Re: Questions About Kafka Source


Hi Qi,


The example hardcodes the topic to be "inputTopic". Please check.
I've just verified kafka2kafka pipeline from gearpump-java-example 2.10-0.7.5 runs successfully
on gearpump 2.10-0.7.6. The screenshot is attached.


On Wed, May 4, 2016 at 1:45 PM 舒琦 shuqi@eefung.com wrote:

Hi Manu,


Just found the app that I build is against gearpump 2.11-0.7.5, I’ll rebuild it and try
again.


Thanks.




————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609


原始邮件
发件人:舒琦shuqi@eefung.com
收件人:useruser@gearpump.incubator.apache.org
发送时间:2016年5月4日(周三) 13:27
主题:Re: Questions About Kafka Source


Hi Manu,


Our Kafka cluster’s version is 2.10-0.8.2.0, and command line can produce and consume messages
from the same topic.


===Test case A===
Gearpump: 2.10-0.7.6
app: kafka2kafka in gearpump-java-example


when start the app, I got the exception as belowing:


[ERROR] [05/04/2016 13:03:51.209] [ActorSystemImpl] Uncaught error from thread [app4system1-gearpump.shared-thread-pool-dispatcher-18]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at kafka.utils.ZkUtils$$anonfun$getPartitionAssignmentForTopics$1.apply(ZkUtils.scala:547)
at kafka.utils.ZkUtils$$anonfun$getPartitionAssignmentForTopics$1.apply(ZkUtils.scala:531)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.utils.ZkUtils$.getPartitionAssignmentForTopics(ZkUtils.scala:531)
at kafka.utils.ZkUtils$.getPartitionsForTopics(ZkUtils.scala:553)
at io.gearpump.streaming.kafka.lib.KafkaUtil$.getTopicAndPartitions(KafkaUtil.scala:57)
at io.gearpump.streaming.kafka.KafkaSource.open(KafkaSource.scala:152)
at io.gearpump.streaming.source.DataSourceTask.onStart(DataSourceTask.scala:49)
at io.gearpump.streaming.task.TaskWrapper.onStart(TaskWrapper.scala:90)
at io.gearpump.streaming.task.TaskActor.onStart(TaskActor.scala:101)
at io.gearpump.streaming.task.TaskActor.io$gearpump$streaming$task$TaskActor$$onStartClock(TaskActor.scala:199)
at io.gearpump.streaming.task.TaskActor$$anonfun$waitForStartClock$1.applyOrElse(TaskActor.scala:216)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at io.gearpump.streaming.task.TaskActor.aroundReceive(TaskActor.scala:41)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


===Test case B===
Gearpump: 2.11-0.7.6
app: kafka2kafka in gearpump-java-example


After app started 7 minutes, I got the exception as blowing:


INFO] [05/04/2016 13:16:09.671] [ClientCnxn] EventThread shut down
[WARN] [05/04/2016 13:23:09.348] [Selector] Error in I/O with buka2/172.19.105.20
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)


I am a little confused, why gearpump2.10-0.7.6 will throw such exception, because it may be
caused by different version of scala, but not 2.11-0.7.6.


Thanks.


————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609


原始邮件
发件人:Manu Zhangowenzhang1990@gmail.com
收件人:useruser@gearpump.incubator.apache.org
发送时间:2016年5月3日(周二) 16:55
主题:Re: Questions About Kafka Source


Hi Qi,


I don't see anything suspicious in the log. Could you try out thehttps://github.com/gearpump/gearpump-java-example/tree/master/src/main/java/kafka2kafkaexample
to see whether it's a framework bug ?


"group-id" is set to "gearpump" if not configured by user. If you want to configure "group-id",
you may create KafkaSource like


Properties properties = new Properties();
properties.put("group-id", "my-group");
properties.put("zookeeper.servers", "localhost:2181");
KafkaSource source = new KafkaSource("topic", properties, storageFactory);




On Tue, May 3, 2016 at 3:13 PM 舒琦 shuqi@eefung.com wrote:

Hi Manu,


Could you also help me to check the log in the attachment.


How can I specify a group id when using Kafka Source, now I just set “group.id=XXX” in
UserConfig.


Thanks.




————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609


原始邮件
发件人:舒琦shuqi@eefung.com
收件人:useruser@gearpump.incubator.apache.org
发送时间:2016年5月3日(周二) 14:19
主题:Re: Questions About Kafka Source


Hi Manu,


Gearpump: 0.7.6_2.11
Kafka: 0.8.2.1_2.10.


Thanks.


————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609


原始邮件
发件人:Manu Zhangowenzhang1990@gmail.com
收件人:useruser@gearpump.incubator.apache.org
发送时间:2016年5月3日(周二) 14:13
主题:Re: Questions About Kafka Source


Hi Qi,


Your code looks right. Which gearpump version and kafka version have you used ?






On Tue, May 3, 2016 at 1:37 PM 舒琦 shuqi@eefung.com wrote:

Hi Manu,


Thanks for your help.


I used the kafka-console-consumer with the same zks and topic, and it can consume messages.
There is still lots of messages in that topic.


Belowing is the function I used to get Kafka Soruce, could you please help to check if it
is ok, thanks.






————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609


原始邮件
发件人:Manu Zhangowenzhang1990@gmail.com
收件人:useruser@gearpump.incubator.apache.org
发送时间:2016年5月3日(周二) 12:40
主题:Re: Questions About Kafka Source


Hi Qi,


Neither the red ballon nor the message receive message throughput means any message has been
consumed by KafkaSource. Those are messages source send to itself to trigger next Task execution.
The metrics is a bit confusing and I think we need to fix this.


Yes, both zookeeper servers and kafka brokers configs are comma-separated list strings. One
way to check whether your configurations is correct it to consume from the topic using kafka-console-consumer.
This also makes sure the topic has data to consume.


Hope this helps.


Thanks,
Manu


On Tue, May 3, 2016 at 12:07 PM 舒琦 shuqi@eefung.com wrote:

Hi,


I constructed a DAG as show blowing, “kafka source”consumes messages from kafka topic
“webs”, its metrics shows that it consumes lots of messages, but actually there is no
messages handled and I also can’t find active group under topic “webs”, the log is ok
too.


I just wonder the properties of kafka for zks and brokers, if there is a list of zookeeper
servers, should I use comma to separate? just like below:


zks=zk1:3181,zk2:3181,zk3:3181
brokers=kfk1:9096,kfk2:9096,kfk3:9096


Thanks for your help.






————————
Qi Shu
Mime
View raw message