gearpump-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manu Zhang <owenzhang1...@gmail.com>
Subject Re: Questions About Kafka Source
Date Wed, 04 May 2016 11:12:07 GMT
Hi Qi,

I used Kafka 0.9. Gearpump KafkaSource is compatible with both kafka 0.8
and kafka 0.9.
What do you mean by "lagged a lot" ? If the parallelism of KafkaSource is
the same as the number of partitions, there should be one-to-one mapping
between KafkaSource task and partition.
The application clock is supposed to advance as long as there are messages
coming in.

Do you have an IM that I can contact you directly ?


On Wed, May 4, 2016 at 6:44 PM 舒琦 <shuqi@eefung.com> wrote:

> Hi Manu,
>
>
> One more thing, the app always show blowing:
>
>
>
> ————————
> 舒琦
> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
> 网址:http://www.eefung.com
> 微博:http://weibo.com/eefung
> 邮编:410013
> 电话:400-677-0986
> 传真:0731-88519609
>
>  原始邮件
> *发件人:* 舒琦<shuqi@eefung.com>
> *收件人:* user<user@gearpump.incubator.apache.org>
> *发送时间:* 2016年5月4日(周三) 18:41
> *主题:* Re: Questions About Kafka Source
>
> Hi Manu,
>
>
> The topic in our Kakfa cluster has 8 partitions, since set property
> parallelism of Kafka source to 8,
>
> I found that there were messages flowing in kafka2kafka app, and can
> consume messages from sink topic, but it lagged a lot.
>
>
> I wonder if it caused by polling from partition round-robin?
>
> Thanks.
>
> ————————
> 舒琦
> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
> 网址:http://www.eefung.com
> 微博:http://weibo.com/eefung
> 邮编:410013
> 电话:400-677-0986
> 传真:0731-88519609
>
>  原始邮件
> *发件人:* 舒琦<shuqi@eefung.com>
> *收件人:* user<user@gearpump.incubator.apache.org>
> *发送时间:* 2016年5月4日(周三) 15:10
> *主题:* Re: Questions About Kafka Source
>
> Hi Manu,
>
>
> What’s Kafka version in your test? And is Kafka Source in gearpump
> compatible with Kafka version 0.8.2?
>
>
> Thanks.
>
> ————————
> 舒琦
> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
> 网址:http://www.eefung.com
> 微博:http://weibo.com/eefung
> 邮编:410013
> 电话:400-677-0986
> 传真:0731-88519609
>
>  原始邮件
> *发件人:* 舒琦<shuqi@eefung.com>
> *收件人:* user<user@gearpump.incubator.apache.org>
> *发送时间:* 2016年5月4日(周三) 14:37
> *主题:* Re: Questions About Kafka Source
>
> Hi Manu,
>
>
>
> Thanks.
>
> ————————
> 舒琦
> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
> 网址:http://www.eefung.com
> 微博:http://weibo.com/eefung
> 邮编:410013
> 电话:400-677-0986
> 传真:0731-88519609
>
>  原始邮件
> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
> *收件人:* user<user@gearpump.incubator.apache.org>
> *发送时间:* 2016年5月4日(周三) 14:29
> *主题:* Re: Questions About Kafka Source
>
> Hi Qi,
>
> Could you also paste information about your kafka topic ? For example, the
> output of "bin/kafka-topics --list --zookeeper zk11:3181,zk12:3181"
>
>
> On Wed, May 4, 2016 at 2:26 PM 舒琦 <shuqi@eefung.com> wrote:
>
>> Hi Manu,
>>
>>
>> I modify the kaka configuration already, the code as showed below:
>>
>>
>>
>> I know there must be something wrong in my side, but I don’t have any
>> idea now.
>>
>> Do you have any other suggestions, thanks.
>>
>>
>> ————————
>> 舒琦
>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>> 网址:http://www.eefung.com
>> 微博:http://weibo.com/eefung
>> 邮编:410013
>> 电话:400-677-0986
>> 传真:0731-88519609
>>
>>  原始邮件
>> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
>> *收件人:* user<user@gearpump.incubator.apache.org>
>> *发送时间:* 2016年5月4日(周三) 14:21
>> *主题:* Re: Questions About Kafka Source
>>
>> Hi Qi,
>>
>> The example hardcodes the topic to be "inputTopic".  Please check.
>> I've just verified kafka2kafka pipeline from gearpump-java-example
>> 2.10-0.7.5 runs successfully on gearpump 2.10-0.7.6. The screenshot is
>> attached.
>>
>> On Wed, May 4, 2016 at 1:45 PM 舒琦 <shuqi@eefung.com> wrote:
>>
>>> Hi Manu,
>>>
>>>
>>> Just found the app that I build is against gearpump 2.11-0.7.5, I’ll
>>> rebuild it and try again.
>>>
>>>
>>> Thanks.
>>>
>>>
>>> ————————
>>> 舒琦
>>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>>> 网址:http://www.eefung.com
>>> 微博:http://weibo.com/eefung
>>> 邮编:410013
>>> 电话:400-677-0986
>>> 传真:0731-88519609
>>>
>>>  原始邮件
>>> *发件人:* 舒琦<shuqi@eefung.com>
>>> *收件人:* user<user@gearpump.incubator.apache.org>
>>> *发送时间:* 2016年5月4日(周三) 13:27
>>> *主题:* Re: Questions About Kafka Source
>>>
>>> Hi Manu,
>>>
>>>
>>> Our Kafka cluster’s version is 2.10-0.8.2.0, and command line can
>>> produce and consume messages from the same topic.
>>>
>>>
>>> ===Test case A===
>>>
>>> Gearpump: 2.10-0.7.6
>>>
>>> app: kafka2kafka in gearpump-java-example
>>>
>>>
>>> when start the app, I got the exception as belowing:
>>>
>>>
>>> [ERROR] [05/04/2016 13:03:51.209] [ActorSystemImpl] Uncaught error from
>>> thread [app4system1-gearpump.shared-thread-pool-dispatcher-18] shutting
>>> down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
>>>
>>> java.lang.NoSuchMethodError:
>>> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>>>
>>> at
>>> kafka.utils.ZkUtils$$anonfun$getPartitionAssignmentForTopics$1.apply(ZkUtils.scala:547)
>>>
>>> at
>>> kafka.utils.ZkUtils$$anonfun$getPartitionAssignmentForTopics$1.apply(ZkUtils.scala:531)
>>>
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>>
>>> at
>>> kafka.utils.ZkUtils$.getPartitionAssignmentForTopics(ZkUtils.scala:531)
>>>
>>> at kafka.utils.ZkUtils$.getPartitionsForTopics(ZkUtils.scala:553)
>>>
>>> at
>>> io.gearpump.streaming.kafka.lib.KafkaUtil$.getTopicAndPartitions(KafkaUtil.scala:57)
>>>
>>> at io.gearpump.streaming.kafka.KafkaSource.open(KafkaSource.scala:152)
>>>
>>> at
>>> io.gearpump.streaming.source.DataSourceTask.onStart(DataSourceTask.scala:49)
>>>
>>> at io.gearpump.streaming.task.TaskWrapper.onStart(TaskWrapper.scala:90)
>>>
>>> at io.gearpump.streaming.task.TaskActor.onStart(TaskActor.scala:101)
>>>
>>> at io.gearpump.streaming.task.TaskActor.io
>>> $gearpump$streaming$task$TaskActor$$onStartClock(TaskActor.scala:199)
>>>
>>> at
>>> io.gearpump.streaming.task.TaskActor$$anonfun$waitForStartClock$1.applyOrElse(TaskActor.scala:216)
>>>
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>>
>>> at io.gearpump.streaming.task.TaskActor.aroundReceive(TaskActor.scala:41)
>>>
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>>
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> ===Test case B===
>>>
>>> Gearpump: 2.11-0.7.6
>>>
>>> app: kafka2kafka in gearpump-java-example
>>>
>>> After app started 7 minutes, I got the exception as blowing:
>>>
>>> INFO] [05/04/2016 13:16:09.671] [ClientCnxn] EventThread shut down
>>> [WARN] [05/04/2016 13:23:09.348] [Selector] Error in I/O with buka2/
>>> 172.19.105.20
>>> java.io.EOFException
>>> at
>>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>>> at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>>> at
>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>>> at
>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I am a little confused, why gearpump 2.10-0.7.6 will throw such
>>> exception, because it may be caused by different version of scala, but not
>>> 2.11-0.7.6.
>>>
>>> Thanks.
>>>
>>> ————————
>>> 舒琦
>>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>>> 网址:http://www.eefung.com
>>> 微博:http://weibo.com/eefung
>>> 邮编:410013
>>> 电话:400-677-0986
>>> 传真:0731-88519609
>>>
>>>  原始邮件
>>> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
>>> *收件人:* user<user@gearpump.incubator.apache.org>
>>> *发送时间:* 2016年5月3日(周二) 16:55
>>> *主题:* Re: Questions About Kafka Source
>>>
>>> Hi Qi,
>>>
>>> I don't see anything suspicious in the log. Could you try out the
>>> https://github.com/gearpump/gearpump-java-example/tree/master/src/main/java/kafka2kafka
example
>>> to see whether it's a framework bug ?
>>>
>>> "group-id" is set to "gearpump" if not configured by user. If you want
>>> to configure "group-id", you may create KafkaSource like
>>>
>>> *Properties properties = new Properties();*
>>> *properties.put("group-id", "my-group");*
>>> *properties.put("zookeeper.servers", "localhost:2181");*
>>> *KafkaSource source = new KafkaSource("topic", properties,
>>> storageFactory);*
>>>
>>>
>>> On Tue, May 3, 2016 at 3:13 PM 舒琦 <shuqi@eefung.com> wrote:
>>>
>>>> Hi Manu,
>>>>
>>>>
>>>> Could you also help me to check the log in the attachment.
>>>>
>>>>
>>>> How can I specify a group id when using Kafka Source, now I just set “
>>>> group.id=XXX” in UserConfig.
>>>>
>>>>
>>>> Thanks.
>>>>
>>>>
>>>> ————————
>>>> 舒琦
>>>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>>>> 网址:http://www.eefung.com
>>>> 微博:http://weibo.com/eefung
>>>> 邮编:410013
>>>> 电话:400-677-0986
>>>> 传真:0731-88519609
>>>>
>>>>  原始邮件
>>>> *发件人:* 舒琦<shuqi@eefung.com>
>>>> *收件人:* user<user@gearpump.incubator.apache.org>
>>>> *发送时间:* 2016年5月3日(周二) 14:19
>>>> *主题:* Re: Questions About Kafka Source
>>>>
>>>> Hi Manu,
>>>>
>>>>
>>>> Gearpump: 0.7.6_2.11
>>>>
>>>> Kafka: 0.8.2.1_2.10.
>>>>
>>>>
>>>> Thanks.
>>>>
>>>> ————————
>>>> 舒琦
>>>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>>>> 网址:http://www.eefung.com
>>>> 微博:http://weibo.com/eefung
>>>> 邮编:410013
>>>> 电话:400-677-0986
>>>> 传真:0731-88519609
>>>>
>>>>  原始邮件
>>>> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
>>>> *收件人:* user<user@gearpump.incubator.apache.org>
>>>> *发送时间:* 2016年5月3日(周二) 14:13
>>>> *主题:* Re: Questions About Kafka Source
>>>>
>>>> Hi Qi,
>>>>
>>>> Your code looks right. Which gearpump version and kafka version have
>>>> you used ?
>>>>
>>>>
>>>>
>>>> On Tue, May 3, 2016 at 1:37 PM 舒琦 <shuqi@eefung.com> wrote:
>>>>
>>>>> Hi Manu,
>>>>>
>>>>>
>>>>> Thanks for your help.
>>>>>
>>>>>
>>>>> I used the kafka-console-consumer with the same zks and topic, and it
>>>>> can consume messages. There is still lots of messages in that topic.
>>>>>
>>>>>
>>>>> Belowing is the function I used to get Kafka Soruce, could you please
>>>>> help to check if it is ok, thanks.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ————————
>>>>> 舒琦
>>>>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>>>>> 网址:http://www.eefung.com
>>>>> 微博:http://weibo.com/eefung
>>>>> 邮编:410013
>>>>> 电话:400-677-0986
>>>>> 传真:0731-88519609
>>>>>
>>>>>  原始邮件
>>>>> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
>>>>> *收件人:* user<user@gearpump.incubator.apache.org>
>>>>> *发送时间:* 2016年5月3日(周二) 12:40
>>>>> *主题:* Re: Questions About Kafka Source
>>>>>
>>>>> Hi Qi,
>>>>>
>>>>> Neither the red ballon nor the message receive message throughput
>>>>> means any message has been consumed by KafkaSource. Those are messages
>>>>> source send to itself to trigger next Task execution. The metrics is
a bit
>>>>> confusing and I think we need to fix this.
>>>>>
>>>>> Yes, both zookeeper servers and kafka brokers configs are
>>>>> comma-separated list strings. One way to check whether your configurations
>>>>> is correct it to consume from the topic using kafka-console-consumer.
This
>>>>> also makes sure the topic has data to consume.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Thanks,
>>>>> Manu
>>>>>
>>>>> On Tue, May 3, 2016 at 12:07 PM 舒琦 <shuqi@eefung.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>> I constructed a DAG as show blowing, “kafka source”consumes messages
>>>>>> from kafka topic “webs”, its metrics shows  that it consumes
lots of
>>>>>> messages, but actually there is no messages handled and I also can’t
find
>>>>>> active group under topic “webs”, the log is ok too.
>>>>>>
>>>>>>
>>>>>> I just wonder the properties of kafka for zks and brokers, if there
>>>>>> is a list of zookeeper servers, should I use comma to separate? just
like
>>>>>> below:
>>>>>>
>>>>>>
>>>>>> zks=zk1:3181,zk2:3181,zk3:3181
>>>>>>
>>>>>> brokers=kfk1:9096,kfk2:9096,kfk3:9096
>>>>>>
>>>>>>
>>>>>> Thanks for your help.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ————————
>>>>>> Qi Shu
>>>>>>
>>>>>

Mime
View raw message