Hi Manu,


One more thing, the app always show blowing:



————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609

 原始邮件 
发件人: 舒琦<shuqi@eefung.com>
收件人: user<user@gearpump.incubator.apache.org>
发送时间: 2016年5月4日(周三) 18:41
主题: Re: Questions About Kafka Source

Hi Manu,


The topic in our Kakfa cluster has 8 partitions, since set property parallelism of Kafka source to 8,

I found that there were messages flowing in kafka2kafka app, and can consume messages from sink topic, but it lagged a lot.


I wonder if it caused by polling from partition round-robin?

Thanks.

————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
邮编:410013
电话:400-677-0986
传真:0731-88519609

 原始邮件 
发件人: 舒琦<shuqi@eefung.com>
发送时间: 2016年5月4日(周三) 15:10
主题: Re: Questions About Kafka Source

Hi Manu,


What’s Kafka version in your test? And is Kafka Source in gearpump compatible with Kafka version 0.8.2?


Thanks.


————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
邮编:410013
电话:400-677-0986
传真:0731-88519609

 原始邮件 
发件人: 舒琦<shuqi@eefung.com>
发送时间: 2016年5月4日(周三) 14:37
主题: Re: Questions About Kafka Source

Hi Manu,



Thanks.


————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
邮编:410013
电话:400-677-0986
传真:0731-88519609

 原始邮件 
发件人: Manu Zhang<owenzhang1990@gmail.com>
发送时间: 2016年5月4日(周三) 14:29
主题: Re: Questions About Kafka Source

Hi Qi,

Could you also paste information about your kafka topic ? For example, the output of "bin/kafka-topics --list --zookeeper zk11:3181,zk12:3181"


On Wed, May 4, 2016 at 2:26 PM 舒琦 <shuqi@eefung.com> wrote:

Hi Manu,


I modify the kaka configuration already, the code as showed below:



I know there must be something wrong in my side, but I don’t have any idea now.

Do you have any other suggestions, thanks.


————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
邮编:410013
电话:400-677-0986
传真:0731-88519609

 原始邮件 
发件人: Manu Zhang<owenzhang1990@gmail.com>
发送时间: 2016年5月4日(周三) 14:21
主题: Re: Questions About Kafka Source

Hi Qi,

The example hardcodes the topic to be "inputTopic".  Please check.
I've just verified kafka2kafka pipeline from gearpump-java-example 2.10-0.7.5 runs successfully on gearpump 2.10-0.7.6. The screenshot is attached.

On Wed, May 4, 2016 at 1:45 PM 舒琦 <shuqi@eefung.com> wrote:

Hi Manu,


Just found the app that I build is against gearpump 2.11-0.7.5, I’ll rebuild it and try again.


Thanks.



————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
邮编:410013
电话:400-677-0986
传真:0731-88519609

 原始邮件 
发件人: 舒琦<shuqi@eefung.com>
发送时间: 2016年5月4日(周三) 13:27
主题: Re: Questions About Kafka Source

Hi Manu,


Our Kafka cluster’s version is 2.10-0.8.2.0, and command line can produce and consume messages from the same topic.


===Test case A===

Gearpump: 2.10-0.7.6

app: kafka2kafka in gearpump-java-example


when start the app, I got the exception as belowing:


[ERROR] [05/04/2016 13:03:51.209] [ActorSystemImpl] Uncaught error from thread [app4system1-gearpump.shared-thread-pool-dispatcher-18] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled

java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;

at kafka.utils.ZkUtils$$anonfun$getPartitionAssignmentForTopics$1.apply(ZkUtils.scala:547)

at kafka.utils.ZkUtils$$anonfun$getPartitionAssignmentForTopics$1.apply(ZkUtils.scala:531)

at scala.collection.immutable.List.foreach(List.scala:318)

at kafka.utils.ZkUtils$.getPartitionAssignmentForTopics(ZkUtils.scala:531)

at kafka.utils.ZkUtils$.getPartitionsForTopics(ZkUtils.scala:553)

at io.gearpump.streaming.kafka.lib.KafkaUtil$.getTopicAndPartitions(KafkaUtil.scala:57)

at io.gearpump.streaming.kafka.KafkaSource.open(KafkaSource.scala:152)

at io.gearpump.streaming.source.DataSourceTask.onStart(DataSourceTask.scala:49)

at io.gearpump.streaming.task.TaskWrapper.onStart(TaskWrapper.scala:90)

at io.gearpump.streaming.task.TaskActor.onStart(TaskActor.scala:101)

at io.gearpump.streaming.task.TaskActor.io$gearpump$streaming$task$TaskActor$$onStartClock(TaskActor.scala:199)

at io.gearpump.streaming.task.TaskActor$$anonfun$waitForStartClock$1.applyOrElse(TaskActor.scala:216)

at akka.actor.Actor$class.aroundReceive(Actor.scala:467)

at io.gearpump.streaming.task.TaskActor.aroundReceive(TaskActor.scala:41)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

at akka.actor.ActorCell.invoke(ActorCell.scala:487)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)

at akka.dispatch.Mailbox.run(Mailbox.scala:220)

at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


===Test case B===

Gearpump: 2.11-0.7.6

app: kafka2kafka in gearpump-java-example


After app started 7 minutes, I got the exception as blowing:

INFO] [05/04/2016 13:16:09.671] [ClientCnxn] EventThread shut down
[WARN] [05/04/2016 13:23:09.348] [Selector] Error in I/O with buka2/172.19.105.20
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)

I am a little confused, why gearpump 2.10-0.7.6 will throw such exception, because it may be caused by different version of scala, but not 2.11-0.7.6.

Thanks.

————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
邮编:410013
电话:400-677-0986
传真:0731-88519609

 原始邮件 
发件人: Manu Zhang<owenzhang1990@gmail.com>
发送时间: 2016年5月3日(周二) 16:55
主题: Re: Questions About Kafka Source

Hi Qi,

I don't see anything suspicious in the log. Could you try out the https://github.com/gearpump/gearpump-java-example/tree/master/src/main/java/kafka2kafka example to see whether it's a framework bug ? 

"group-id" is set to "gearpump" if not configured by user. If you want to configure "group-id", you may create KafkaSource like 

Properties properties = new Properties();
properties.put("group-id", "my-group");
properties.put("zookeeper.servers", "localhost:2181");
KafkaSource source = new KafkaSource("topic", properties, storageFactory);


On Tue, May 3, 2016 at 3:13 PM 舒琦 <shuqi@eefung.com> wrote:

Hi Manu,


Could you also help me to check the log in the attachment.


How can I specify a group id when using Kafka Source, now I just set “group.id=XXX” in UserConfig.


Thanks.



————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
邮编:410013
电话:400-677-0986
传真:0731-88519609

 原始邮件 
发件人: 舒琦<shuqi@eefung.com>
发送时间: 2016年5月3日(周二) 14:19
主题: Re: Questions About Kafka Source

Hi Manu,


Gearpump: 0.7.6_2.11

Kafka: 0.8.2.1_2.10.


Thanks.


————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
邮编:410013
电话:400-677-0986
传真:0731-88519609

 原始邮件 
发件人: Manu Zhang<owenzhang1990@gmail.com>
发送时间: 2016年5月3日(周二) 14:13
主题: Re: Questions About Kafka Source

Hi Qi,

Your code looks right. Which gearpump version and kafka version have you used ?



On Tue, May 3, 2016 at 1:37 PM 舒琦 <shuqi@eefung.com> wrote:

Hi Manu,


Thanks for your help.


I used the kafka-console-consumer with the same zks and topic, and it can consume messages. There is still lots of messages in that topic.


Belowing is the function I used to get Kafka Soruce, could you please help to check if it is ok, thanks.




————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
邮编:410013
电话:400-677-0986
传真:0731-88519609

 原始邮件 
发件人: Manu Zhang<owenzhang1990@gmail.com>
发送时间: 2016年5月3日(周二) 12:40
主题: Re: Questions About Kafka Source

Hi Qi,

Neither the red ballon nor the message receive message throughput means any message has been consumed by KafkaSource. Those are messages source send to itself to trigger next Task execution. The metrics is a bit confusing and I think we need to fix this.

Yes, both zookeeper servers and kafka brokers configs are comma-separated list strings. One way to check whether your configurations is correct it to consume from the topic using kafka-console-consumer. This also makes sure the topic has data to consume.

Hope this helps.

Thanks,
Manu

On Tue, May 3, 2016 at 12:07 PM 舒琦 <shuqi@eefung.com> wrote:

Hi,


I constructed a DAG as show blowing, “kafka source”consumes messages from kafka topic “webs”, its metrics shows  that it consumes lots of messages, but actually there is no messages handled and I also can’t find active group under topic “webs”, the log is ok too.


I just wonder the properties of kafka for zks and brokers, if there is a list of zookeeper servers, should I use comma to separate? just like below:


zks=zk1:3181,zk2:3181,zk3:3181

brokers=kfk1:9096,kfk2:9096,kfk3:9096


Thanks for your help.




————————
Qi Shu