flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Vieru, Mihail" <mihail.vi...@zalando.de>
Subject NPE with Flink Streaming from Kafka
Date Tue, 01 Dec 2015 15:54:35 GMT
Hi,

we get the following NullPointerException after ~50 minutes when running a
streaming job with windowing and state that reads data from Kafka and
writes the result to local FS.
There are around 170 million messages to be processed, Flink 0.10.1 stops
at ~8 million.
Flink runs locally, started with the "start-cluster-streaming.sh" script.

12/01/2015 15:06:24    Job execution switched to status RUNNING.
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to
SCHEDULED
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to
DEPLOYING
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
SCHEDULED
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
DEPLOYING
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to
RUNNING
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
RUNNING
12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
CANCELED
12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched to
FAILED
java.lang.Exception
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at
org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
    at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
    at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
    at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
    at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
    at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
    at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
    at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)


Any ideas on what could cause this behaviour?

Best,
Mihail

Mime
View raw message