gearpump-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manu Zhang <owenzhang1...@gmail.com>
Subject Re: Questions About Kafka Source
Date Wed, 04 May 2016 06:29:40 GMT
Hi Qi,

Could you also paste information about your kafka topic ? For example, the
output of "bin/kafka-topics --list --zookeeper zk11:3181,zk12:3181"


On Wed, May 4, 2016 at 2:26 PM 舒琦 <shuqi@eefung.com> wrote:

> Hi Manu,
>
>
> I modify the kaka configuration already, the code as showed below:
>
>
>
> I know there must be something wrong in my side, but I don’t have any idea
> now.
>
> Do you have any other suggestions, thanks.
>
>
> ————————
> 舒琦
> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
> 网址:http://www.eefung.com
> 微博:http://weibo.com/eefung
> 邮编:410013
> 电话:400-677-0986
> 传真:0731-88519609
>
>  原始邮件
> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
> *收件人:* user<user@gearpump.incubator.apache.org>
> *发送时间:* 2016年5月4日(周三) 14:21
> *主题:* Re: Questions About Kafka Source
>
> Hi Qi,
>
> The example hardcodes the topic to be "inputTopic".  Please check.
> I've just verified kafka2kafka pipeline from gearpump-java-example
> 2.10-0.7.5 runs successfully on gearpump 2.10-0.7.6. The screenshot is
> attached.
>
> On Wed, May 4, 2016 at 1:45 PM 舒琦 <shuqi@eefung.com> wrote:
>
>> Hi Manu,
>>
>>
>> Just found the app that I build is against gearpump 2.11-0.7.5, I’ll
>> rebuild it and try again.
>>
>>
>> Thanks.
>>
>>
>> ————————
>> 舒琦
>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>> 网址:http://www.eefung.com
>> 微博:http://weibo.com/eefung
>> 邮编:410013
>> 电话:400-677-0986
>> 传真:0731-88519609
>>
>>  原始邮件
>> *发件人:* 舒琦<shuqi@eefung.com>
>> *收件人:* user<user@gearpump.incubator.apache.org>
>> *发送时间:* 2016年5月4日(周三) 13:27
>> *主题:* Re: Questions About Kafka Source
>>
>> Hi Manu,
>>
>>
>> Our Kafka cluster’s version is 2.10-0.8.2.0, and command line can produce
>> and consume messages from the same topic.
>>
>>
>> ===Test case A===
>>
>> Gearpump: 2.10-0.7.6
>>
>> app: kafka2kafka in gearpump-java-example
>>
>>
>> when start the app, I got the exception as belowing:
>>
>>
>> [ERROR] [05/04/2016 13:03:51.209] [ActorSystemImpl] Uncaught error from
>> thread [app4system1-gearpump.shared-thread-pool-dispatcher-18] shutting
>> down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
>>
>> java.lang.NoSuchMethodError:
>> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>>
>> at
>> kafka.utils.ZkUtils$$anonfun$getPartitionAssignmentForTopics$1.apply(ZkUtils.scala:547)
>>
>> at
>> kafka.utils.ZkUtils$$anonfun$getPartitionAssignmentForTopics$1.apply(ZkUtils.scala:531)
>>
>> at scala.collection.immutable.List.foreach(List.scala:318)
>>
>> at kafka.utils.ZkUtils$.getPartitionAssignmentForTopics(ZkUtils.scala:531)
>>
>> at kafka.utils.ZkUtils$.getPartitionsForTopics(ZkUtils.scala:553)
>>
>> at
>> io.gearpump.streaming.kafka.lib.KafkaUtil$.getTopicAndPartitions(KafkaUtil.scala:57)
>>
>> at io.gearpump.streaming.kafka.KafkaSource.open(KafkaSource.scala:152)
>>
>> at
>> io.gearpump.streaming.source.DataSourceTask.onStart(DataSourceTask.scala:49)
>>
>> at io.gearpump.streaming.task.TaskWrapper.onStart(TaskWrapper.scala:90)
>>
>> at io.gearpump.streaming.task.TaskActor.onStart(TaskActor.scala:101)
>>
>> at io.gearpump.streaming.task.TaskActor.io
>> $gearpump$streaming$task$TaskActor$$onStartClock(TaskActor.scala:199)
>>
>> at
>> io.gearpump.streaming.task.TaskActor$$anonfun$waitForStartClock$1.applyOrElse(TaskActor.scala:216)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>
>> at io.gearpump.streaming.task.TaskActor.aroundReceive(TaskActor.scala:41)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> ===Test case B===
>>
>> Gearpump: 2.11-0.7.6
>>
>> app: kafka2kafka in gearpump-java-example
>>
>> After app started 7 minutes, I got the exception as blowing:
>>
>> INFO] [05/04/2016 13:16:09.671] [ClientCnxn] EventThread shut down
>> [WARN] [05/04/2016 13:23:09.348] [Selector] Error in I/O with buka2/
>> 172.19.105.20
>> java.io.EOFException
>> at
>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I am a little confused, why gearpump 2.10-0.7.6 will throw such
>> exception, because it may be caused by different version of scala, but not
>> 2.11-0.7.6.
>>
>> Thanks.
>>
>> ————————
>> 舒琦
>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>> 网址:http://www.eefung.com
>> 微博:http://weibo.com/eefung
>> 邮编:410013
>> 电话:400-677-0986
>> 传真:0731-88519609
>>
>>  原始邮件
>> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
>> *收件人:* user<user@gearpump.incubator.apache.org>
>> *发送时间:* 2016年5月3日(周二) 16:55
>> *主题:* Re: Questions About Kafka Source
>>
>> Hi Qi,
>>
>> I don't see anything suspicious in the log. Could you try out the
>> https://github.com/gearpump/gearpump-java-example/tree/master/src/main/java/kafka2kafka
example
>> to see whether it's a framework bug ?
>>
>> "group-id" is set to "gearpump" if not configured by user. If you want to
>> configure "group-id", you may create KafkaSource like
>>
>> *Properties properties = new Properties();*
>> *properties.put("group-id", "my-group");*
>> *properties.put("zookeeper.servers", "localhost:2181");*
>> *KafkaSource source = new KafkaSource("topic", properties,
>> storageFactory);*
>>
>>
>> On Tue, May 3, 2016 at 3:13 PM 舒琦 <shuqi@eefung.com> wrote:
>>
>>> Hi Manu,
>>>
>>>
>>> Could you also help me to check the log in the attachment.
>>>
>>>
>>> How can I specify a group id when using Kafka Source, now I just set “
>>> group.id=XXX” in UserConfig.
>>>
>>>
>>> Thanks.
>>>
>>>
>>> ————————
>>> 舒琦
>>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>>> 网址:http://www.eefung.com
>>> 微博:http://weibo.com/eefung
>>> 邮编:410013
>>> 电话:400-677-0986
>>> 传真:0731-88519609
>>>
>>>  原始邮件
>>> *发件人:* 舒琦<shuqi@eefung.com>
>>> *收件人:* user<user@gearpump.incubator.apache.org>
>>> *发送时间:* 2016年5月3日(周二) 14:19
>>> *主题:* Re: Questions About Kafka Source
>>>
>>> Hi Manu,
>>>
>>>
>>> Gearpump: 0.7.6_2.11
>>>
>>> Kafka: 0.8.2.1_2.10.
>>>
>>>
>>> Thanks.
>>>
>>> ————————
>>> 舒琦
>>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>>> 网址:http://www.eefung.com
>>> 微博:http://weibo.com/eefung
>>> 邮编:410013
>>> 电话:400-677-0986
>>> 传真:0731-88519609
>>>
>>>  原始邮件
>>> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
>>> *收件人:* user<user@gearpump.incubator.apache.org>
>>> *发送时间:* 2016年5月3日(周二) 14:13
>>> *主题:* Re: Questions About Kafka Source
>>>
>>> Hi Qi,
>>>
>>> Your code looks right. Which gearpump version and kafka version have you
>>> used ?
>>>
>>>
>>>
>>> On Tue, May 3, 2016 at 1:37 PM 舒琦 <shuqi@eefung.com> wrote:
>>>
>>>> Hi Manu,
>>>>
>>>>
>>>> Thanks for your help.
>>>>
>>>>
>>>> I used the kafka-console-consumer with the same zks and topic, and it
>>>> can consume messages. There is still lots of messages in that topic.
>>>>
>>>>
>>>> Belowing is the function I used to get Kafka Soruce, could you please
>>>> help to check if it is ok, thanks.
>>>>
>>>>
>>>>
>>>>
>>>> ————————
>>>> 舒琦
>>>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>>>> 网址:http://www.eefung.com
>>>> 微博:http://weibo.com/eefung
>>>> 邮编:410013
>>>> 电话:400-677-0986
>>>> 传真:0731-88519609
>>>>
>>>>  原始邮件
>>>> *发件人:* Manu Zhang<owenzhang1990@gmail.com>
>>>> *收件人:* user<user@gearpump.incubator.apache.org>
>>>> *发送时间:* 2016年5月3日(周二) 12:40
>>>> *主题:* Re: Questions About Kafka Source
>>>>
>>>> Hi Qi,
>>>>
>>>> Neither the red ballon nor the message receive message throughput means
>>>> any message has been consumed by KafkaSource. Those are messages source
>>>> send to itself to trigger next Task execution. The metrics is a bit
>>>> confusing and I think we need to fix this.
>>>>
>>>> Yes, both zookeeper servers and kafka brokers configs are
>>>> comma-separated list strings. One way to check whether your configurations
>>>> is correct it to consume from the topic using kafka-console-consumer. This
>>>> also makes sure the topic has data to consume.
>>>>
>>>> Hope this helps.
>>>>
>>>> Thanks,
>>>> Manu
>>>>
>>>> On Tue, May 3, 2016 at 12:07 PM 舒琦 <shuqi@eefung.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> I constructed a DAG as show blowing, “kafka source”consumes messages
>>>>> from kafka topic “webs”, its metrics shows  that it consumes lots
of
>>>>> messages, but actually there is no messages handled and I also can’t
find
>>>>> active group under topic “webs”, the log is ok too.
>>>>>
>>>>>
>>>>> I just wonder the properties of kafka for zks and brokers, if there is
>>>>> a list of zookeeper servers, should I use comma to separate? just like
>>>>> below:
>>>>>
>>>>>
>>>>> zks=zk1:3181,zk2:3181,zk3:3181
>>>>>
>>>>> brokers=kfk1:9096,kfk2:9096,kfk3:9096
>>>>>
>>>>>
>>>>> Thanks for your help.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ————————
>>>>> Qi Shu
>>>>>
>>>>

Mime
View raw message