flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: NPE thrown when using Storm Kafka Spout with Flink
Date Mon, 07 Sep 2015 08:43:50 GMT
Hi Jerry,

the issue occurs because Flink's storm compatibility layer does not support
custom configuration parameters currently.
There is this JIRA which aims to add the missing feature to Flink:
https://issues.apache.org/jira/browse/FLINK-2525
Maybe (but its unlikely) passing an empty Map in the
AbstractStormSpoutWrapper:

this.spout.open(null,
      StormWrapperSetupHelper
      .convertToTopologyContext((StreamingRuntimeContext)
super.getRuntimeContext(), true),
      new SpoutOutputCollector(this.collector));

would fix the issue. But I suspect that the KafkaSpout needs some
configuration parameters, so we have to wait for FLINK-2525.

Best,
Robert


On Wed, Sep 2, 2015 at 7:58 PM, Jerry Peng <jerry.boyang.peng@gmail.com>
wrote:

> Hello,
>
> When I try to run a storm topology with a Kafka Spout on top of Flink, I
> get an NPE at:
>
> 15:00:32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
>       - Error closing stream operators after an exception.
>
> java.lang.NullPointerException
>
> at storm.kafka.KafkaSpout.close(KafkaSpout.java:130)
>
> at
> org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:197)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 15:00:32,855 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend for state checkpoints is set to jobmanager.
>
> 15:00:32,855 INFO  org.apache.flink.runtime.taskmanager.Task
>       - event_deserializer (5/5) switched to RUNNING
>
> 15:00:32,859 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Source: ads (1/1) switched to FAILED with exception.
>
> java.lang.NullPointerException
>
> at java.util.HashMap.putMapEntries(HashMap.java:500)
>
> at java.util.HashMap.<init>(HashMap.java:489)
>
> at storm.kafka.KafkaSpout.open(KafkaSpout.java:73)
>
> at
> org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102)
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
> Has someone seen this before? or Have a fix?  I am using 0.10beta1 for all
> storm packages and a 0.10-snapshot (latest compiled) for all flink
> packages.  Sample of the kafka code I am using:
>
> Broker broker = new Broker("localhost", 9092);
> GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
> partitionInfo.addPartition(0, broker);
> StaticHosts hosts = new StaticHosts(partitionInfo);
>
> SpoutConfig spoutConfig = new SpoutConfig(hosts, "stuff", UUID.randomUUID().toString());
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> builder.setSpout("kafkaSpout", kafkaSpout, 1);
>
>

Mime
View raw message