flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jerry Peng <jerry.boyang.p...@gmail.com>
Subject NPE thrown when using Storm Kafka Spout with Flink
Date Wed, 02 Sep 2015 17:58:29 GMT
Hello,

When I try to run a storm topology with a Kafka Spout on top of Flink, I
get an NPE at:

15:00:32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
      - Error closing stream operators after an exception.

java.lang.NullPointerException

at storm.kafka.KafkaSpout.close(KafkaSpout.java:130)

at
org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128)

at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)

at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:197)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)

at java.lang.Thread.run(Thread.java:745)

15:00:32,855 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
      - State backend for state checkpoints is set to jobmanager.

15:00:32,855 INFO  org.apache.flink.runtime.taskmanager.Task
      - event_deserializer (5/5) switched to RUNNING

15:00:32,859 INFO  org.apache.flink.runtime.taskmanager.Task
      - Source: ads (1/1) switched to FAILED with exception.

java.lang.NullPointerException

at java.util.HashMap.putMapEntries(HashMap.java:500)

at java.util.HashMap.<init>(HashMap.java:489)

at storm.kafka.KafkaSpout.open(KafkaSpout.java:73)

at
org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102)

at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)

at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)

at java.lang.Thread.run(Thread.java:745)


Has someone seen this before? or Have a fix?  I am using 0.10beta1 for all
storm packages and a 0.10-snapshot (latest compiled) for all flink
packages.  Sample of the kafka code I am using:

Broker broker = new Broker("localhost", 9092);
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, broker);
StaticHosts hosts = new StaticHosts(partitionInfo);

SpoutConfig spoutConfig = new SpoutConfig(hosts, "stuff",
UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

builder.setSpout("kafkaSpout", kafkaSpout, 1);

Mime
View raw message