Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7CE4110A1F for ; Mon, 7 Sep 2015 08:44:14 +0000 (UTC) Received: (qmail 80400 invoked by uid 500); 7 Sep 2015 08:44:11 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 80324 invoked by uid 500); 7 Sep 2015 08:44:11 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 80313 invoked by uid 99); 7 Sep 2015 08:44:11 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Sep 2015 08:44:11 +0000 Received: from mail-wi0-f178.google.com (mail-wi0-f178.google.com [209.85.212.178]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id BF57F1A006D for ; Mon, 7 Sep 2015 08:44:10 +0000 (UTC) Received: by wiclk2 with SMTP id lk2so80386234wic.0 for ; Mon, 07 Sep 2015 01:44:09 -0700 (PDT) X-Received: by 10.180.106.66 with SMTP id gs2mr32642056wib.14.1441615449545; Mon, 07 Sep 2015 01:44:09 -0700 (PDT) MIME-Version: 1.0 Received: by 10.28.9.195 with HTTP; Mon, 7 Sep 2015 01:43:50 -0700 (PDT) In-Reply-To: References: From: Robert Metzger Date: Mon, 7 Sep 2015 10:43:50 +0200 Message-ID: Subject: Re: NPE thrown when using Storm Kafka Spout with Flink To: "user@flink.apache.org" Content-Type: multipart/alternative; boundary=f46d04451a0dc5e52e051f2440de --f46d04451a0dc5e52e051f2440de Content-Type: text/plain; charset=UTF-8 Hi Jerry, the issue occurs because Flink's storm compatibility layer does not support custom configuration parameters currently. There is this JIRA which aims to add the missing feature to Flink: https://issues.apache.org/jira/browse/FLINK-2525 Maybe (but its unlikely) passing an empty Map in the AbstractStormSpoutWrapper: this.spout.open(null, StormWrapperSetupHelper .convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true), new SpoutOutputCollector(this.collector)); would fix the issue. But I suspect that the KafkaSpout needs some configuration parameters, so we have to wait for FLINK-2525. Best, Robert On Wed, Sep 2, 2015 at 7:58 PM, Jerry Peng wrote: > Hello, > > When I try to run a storm topology with a Kafka Spout on top of Flink, I > get an NPE at: > > 15:00:32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask > - Error closing stream operators after an exception. > > java.lang.NullPointerException > > at storm.kafka.KafkaSpout.close(KafkaSpout.java:130) > > at > org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128) > > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:197) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > > at java.lang.Thread.run(Thread.java:745) > > 15:00:32,855 INFO org.apache.flink.streaming.runtime.tasks.StreamTask > - State backend for state checkpoints is set to jobmanager. > > 15:00:32,855 INFO org.apache.flink.runtime.taskmanager.Task > - event_deserializer (5/5) switched to RUNNING > > 15:00:32,859 INFO org.apache.flink.runtime.taskmanager.Task > - Source: ads (1/1) switched to FAILED with exception. > > java.lang.NullPointerException > > at java.util.HashMap.putMapEntries(HashMap.java:500) > > at java.util.HashMap.(HashMap.java:489) > > at storm.kafka.KafkaSpout.open(KafkaSpout.java:73) > > at > org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > > at java.lang.Thread.run(Thread.java:745) > > > Has someone seen this before? or Have a fix? I am using 0.10beta1 for all > storm packages and a 0.10-snapshot (latest compiled) for all flink > packages. Sample of the kafka code I am using: > > Broker broker = new Broker("localhost", 9092); > GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); > partitionInfo.addPartition(0, broker); > StaticHosts hosts = new StaticHosts(partitionInfo); > > SpoutConfig spoutConfig = new SpoutConfig(hosts, "stuff", UUID.randomUUID().toString()); > spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); > > builder.setSpout("kafkaSpout", kafkaSpout, 1); > > --f46d04451a0dc5e52e051f2440de Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Jerry,

the issue occurs because Flin= k's storm compatibility layer does not support custom configuration par= ameters currently.
There is this JIRA which aims to add the missi= ng feature to Flink:=C2=A0https://issues.apache.org/jira/browse/FLINK-2525
= Maybe (but its unlikely) passing an empty Map in the=C2=A0Abstra= ctStormSpoutWrapper:

this.spout.open(null,
StormWra= pperSetupHelper
.convertToTopolo= gyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
new SpoutOutputCollector(this.collector));would fix the iss= ue. But I suspect that the KafkaSpout needs some configuration parameters, = so we have to wait for FLINK-2525.

Best,
Robert


On Wed, Sep 2, 2015 at 7:58 PM, Jerry Peng <jer= ry.boyang.peng@gmail.com> wrote:
Hello,

When I try to run a storm = topology with a Kafka Spout on top of Flink, I get an NPE at:

15:00:= 32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 - Error closing stream operators after an exceptio= n.

java.lang.NullPoin= terException

at storm.kafka.KafkaSpout.close(KafkaSpout.java:= 130)

at org.apache.flink.stormcompatibility.wrappers.= AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128)

at org.apache.flink.api.common.functions.util.Fu= nctionUtils.closeFunction(FunctionUtils.java:40)

at org.apache.flink.streaming.api.operators.Abst= ractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)

at org.apache.flink.streaming.runtime.tasks.Stre= amTask.closeAllOperators(StreamTask.java:243)

at org.apache.flink.streaming.runtime.tasks.Stre= amTask.invoke(StreamTask.java:197)

at org.apache.flink.runtime.taskmanager.Task.run= (Task.java:581)

at java.lang.Thread.run(Thread.java:745)

15:00:32,855 INFO= =C2=A0 org.apache.flink.streaming.runtime.tasks.StreamTask =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 - State backend for state checkpoints is set to jobman= ager.

15:00:32,855 INFO= =C2=A0 org.apache.flink.runtime.taskmanager.Task =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 - event_deserializer (5/5) sw= itched to RUNNING

15:00:32,859 INFO= =C2=A0 org.apache.flink.runtime.taskmanager.Task =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 - Source: ads (1/1) switched = to FAILED with exception.

java.lang.NullPoin= terException

at java.util.HashMap.putMapEntries(HashMap.java:= 500)

at java.util.HashMap.<init>(HashMap.java:4= 89)

at storm.kafka.KafkaSpout.open(KafkaSpout.java:7= 3)

at org.apache.flink.stormcompatibility.wrappers.= AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102)

at org.apache.flink.streaming.api.operators.Stre= amSource.run(StreamSource.java:57)

at org.apache.flink.streaming.runtime.tasks.Sour= ceStreamTask.run(SourceStreamTask.java:58)

at org.apache.flink.streaming.runtime.tasks.Stre= amTask.invoke(StreamTask.java:172)

at org.apache.flink.runtime.taskmanager.Task.run= (Task.java:581)

at java.lang.Thread.run(Thread.java:745)



Has someone seen this before? or Have a fix?= =C2=A0 I am using 0.10beta1 for all storm packages and a 0.10-snapshot (lat= est compiled) for all flink packages.=C2=A0 Sample of the kafka code I am u= sing:

Broker=
 broker =3D new Broker("localhost", 9092);
GlobalPartitionInformation partitionInfo =3D new GlobalPartitionInformat= ion();
partitionInfo.addPartition(0= , broker);
StaticHosts hosts =3D new StaticHosts(partitionInfo);

SpoutConfig spoutConf= ig =3D new SpoutConfi= g(hosts, "stuff<= span style=3D"color:#008000;font-weight:bold">", UUID.randomUUID().toString());
spoutConfig.scheme =3D new SchemeAsMultiScheme(new StringScheme());
Kafka= Spout kafkaSpout =3D new
KafkaSpout(spoutConfig);

builder.setSpout("kafkaSpout", kafkaSpout, 1);

--f46d04451a0dc5e52e051f2440de--