gearpump-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manu Zhang <owenzhang1...@gmail.com>
Subject Re: Questions About Kafka Source
Date Wed, 04 May 2016 06:21:18 GMT
Hi Qi,

The example hardcodes the topic to be "inputTopic".  Please check.
I've just verified kafka2kafka pipeline from gearpump-java-example
2.10-0.7.5 runs successfully on gearpump 2.10-0.7.6. The screenshot is
attached.

On Wed, May 4, 2016 at 1:45 PM 舒琦 <shuqi@eefung.com> wrote:

> Hi Manu,
>
>
> Just found the app that I build is against gearpump 2.11-0.7.5, I’ll
> rebuild it and try again.
>
>
> Thanks.
>
>
> ————————
> 舒琦
> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
> 网址:http://www.eefung.com
> 微博:http://weibo.com/eefung
> 邮编:410013
> 电话:400-677-0986
> 传真:0731-88519609
>
>  原始邮件
> *发件人:* 舒琦<shuqi@eefung.com>
> *收件人:* user<user@gearpump.incubator.apache.org>
> *发送时间:* 2016年5月4日(周三) 13:27
> *主题:* Re: Questions About Kafka Source
>
> Hi Manu,
>
>
> Our Kafka cluster’s version is 2.10-0.8.2.0, and command line can produce
> and consume messages from the same topic.
>
>
> ===Test case A===
>
> Gearpump: 2.10-0.7.6
>
> app: kafka2kafka in gearpump-java-example
>
>
> when start the app, I got the exception as belowing:
>
>
> [ERROR] [05/04/2016 13:03:51.209] [ActorSystemImpl] Uncaught error from
> thread [app4system1-gearpump.shared-thread-pool-dispatcher-18] shutting
> down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
>
> java.lang.NoSuchMethodError:
> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>
> at
> kafka.utils.ZkUtils$$anonfun$getPartitionAssignmentForTopics$1.apply(ZkUtils.scala:547)
>
> at
> kafka.utils.ZkUtils$$anonfun$getPartitionAssignmentForTopics$1.apply(ZkUtils.scala:531)
>
> at scala.collection.immutable.List.foreach(List.scala:318)
>
> at kafka.utils.ZkUtils$.getPartitionAssignmentForTopics(ZkUtils.scala:531)
>
> at kafka.utils.ZkUtils$.getPartitionsForTopics(ZkUtils.scala:553)
>
> at
> io.gearpump.streaming.kafka.lib.KafkaUtil$.getTopicAndPartitions(KafkaUtil.scala:57)
>
> at io.gearpump.streaming.kafka.KafkaSource.open(KafkaSource.scala:152)
>
> at
> io.gearpump.streaming.source.DataSourceTask.onStart(DataSourceTask.scala:49)
>
> at io.gearpump.streaming.task.TaskWrapper.onStart(TaskWrapper.scala:90)
>
> at io.gearpump.streaming.task.TaskActor.onStart(TaskActor.scala:101)
>
> at io.gearpump.streaming.task.TaskActor.io
> $gearpump$streaming$task$TaskActor$$onStartClock(TaskActor.scala:199)
>
> at
> io.gearpump.streaming.task.TaskActor$$anonfun$waitForStartClock$1.applyOrElse(TaskActor.scala:216)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>
> at io.gearpump.streaming.task.TaskActor.aroundReceive(TaskActor.scala:41)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> ===Test case B===
>
> Gearpump: 2.11-0.7.6
>
> app: kafka2kafka in gearpump-java-example
>
> After app started 7 minutes, I got the exception as blowing:
>
> INFO] [05/04/2016 13:16:09.671] [ClientCnxn] EventThread shut down
> [WARN] [05/04/2016 13:23:09.348] [Selector] Error in I/O with buka2/
> 172.19.105.20
> java.io.EOFException
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> at java.lang.Thread.run(Thread.java:745)
>
> I am a little confused, why gearpump 2.10-0.7.6 will throw such
> exception, because it may be caused by different version of scala, but not
> 2.11-0.7.6.
>
> Thanks.
>
> ————————
> 舒琦
> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
> 网址:http://www.eefung.com
> 微博:http://weibo.com/eefung
> 邮编:410013
> 电话:400-677-0986
> 传真:0731-88519609
>
>  原始邮件
> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
> *收件人:* user<user@gearpump.incubator.apache.org>
> *发送时间:* 2016年5月3日(周二) 16:55
> *主题:* Re: Questions About Kafka Source
>
> Hi Qi,
>
> I don't see anything suspicious in the log. Could you try out the
> https://github.com/gearpump/gearpump-java-example/tree/master/src/main/java/kafka2kafka
example
> to see whether it's a framework bug ?
>
> "group-id" is set to "gearpump" if not configured by user. If you want to
> configure "group-id", you may create KafkaSource like
>
> *Properties properties = new Properties();*
> *properties.put("group-id", "my-group");*
> *properties.put("zookeeper.servers", "localhost:2181");*
> *KafkaSource source = new KafkaSource("topic", properties,
> storageFactory);*
>
>
> On Tue, May 3, 2016 at 3:13 PM 舒琦 <shuqi@eefung.com> wrote:
>
>> Hi Manu,
>>
>>
>> Could you also help me to check the log in the attachment.
>>
>>
>> How can I specify a group id when using Kafka Source, now I just set “
>> group.id=XXX” in UserConfig.
>>
>>
>> Thanks.
>>
>>
>> ————————
>> 舒琦
>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>> 网址:http://www.eefung.com
>> 微博:http://weibo.com/eefung
>> 邮编:410013
>> 电话:400-677-0986
>> 传真:0731-88519609
>>
>>  原始邮件
>> *发件人:* 舒琦<shuqi@eefung.com>
>> *收件人:* user<user@gearpump.incubator.apache.org>
>> *发送时间:* 2016年5月3日(周二) 14:19
>> *主题:* Re: Questions About Kafka Source
>>
>> Hi Manu,
>>
>>
>> Gearpump: 0.7.6_2.11
>>
>> Kafka: 0.8.2.1_2.10.
>>
>>
>> Thanks.
>>
>> ————————
>> 舒琦
>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>> 网址:http://www.eefung.com
>> 微博:http://weibo.com/eefung
>> 邮编:410013
>> 电话:400-677-0986
>> 传真:0731-88519609
>>
>>  原始邮件
>> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
>> *收件人:* user<user@gearpump.incubator.apache.org>
>> *发送时间:* 2016年5月3日(周二) 14:13
>> *主题:* Re: Questions About Kafka Source
>>
>> Hi Qi,
>>
>> Your code looks right. Which gearpump version and kafka version have you
>> used ?
>>
>>
>>
>> On Tue, May 3, 2016 at 1:37 PM 舒琦 <shuqi@eefung.com> wrote:
>>
>>> Hi Manu,
>>>
>>>
>>> Thanks for your help.
>>>
>>>
>>> I used the kafka-console-consumer with the same zks and topic, and it
>>> can consume messages. There is still lots of messages in that topic.
>>>
>>>
>>> Belowing is the function I used to get Kafka Soruce, could you please
>>> help to check if it is ok, thanks.
>>>
>>>
>>>
>>>
>>> ————————
>>> 舒琦
>>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>>> 网址:http://www.eefung.com
>>> 微博:http://weibo.com/eefung
>>> 邮编:410013
>>> 电话:400-677-0986
>>> 传真:0731-88519609
>>>
>>>  原始邮件
>>> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
>>> *收件人:* user<user@gearpump.incubator.apache.org>
>>> *发送时间:* 2016年5月3日(周二) 12:40
>>> *主题:* Re: Questions About Kafka Source
>>>
>>> Hi Qi,
>>>
>>> Neither the red ballon nor the message receive message throughput means
>>> any message has been consumed by KafkaSource. Those are messages source
>>> send to itself to trigger next Task execution. The metrics is a bit
>>> confusing and I think we need to fix this.
>>>
>>> Yes, both zookeeper servers and kafka brokers configs are
>>> comma-separated list strings. One way to check whether your configurations
>>> is correct it to consume from the topic using kafka-console-consumer. This
>>> also makes sure the topic has data to consume.
>>>
>>> Hope this helps.
>>>
>>> Thanks,
>>> Manu
>>>
>>> On Tue, May 3, 2016 at 12:07 PM 舒琦 <shuqi@eefung.com> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>> I constructed a DAG as show blowing, “kafka source”consumes messages
>>>> from kafka topic “webs”, its metrics shows  that it consumes lots of
>>>> messages, but actually there is no messages handled and I also can’t find
>>>> active group under topic “webs”, the log is ok too.
>>>>
>>>>
>>>> I just wonder the properties of kafka for zks and brokers, if there is
>>>> a list of zookeeper servers, should I use comma to separate? just like
>>>> below:
>>>>
>>>>
>>>> zks=zk1:3181,zk2:3181,zk3:3181
>>>>
>>>> brokers=kfk1:9096,kfk2:9096,kfk3:9096
>>>>
>>>>
>>>> Thanks for your help.
>>>>
>>>>
>>>>
>>>>
>>>> ————————
>>>> Qi Shu
>>>>
>>>

Mime
View raw message