flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: NPE with Flink Streaming from Kafka
Date Tue, 01 Dec 2015 16:39:31 GMT
I think its this one https://issues.apache.org/jira/browse/KAFKA-824

On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <mxm@apache.org> wrote:

> I know this has been fixed already but, out of curiosity, could you
> point me to the Kafka JIRA issue for this
> bug? From the Flink issue it looks like this is a Zookeeper version
> mismatch.
>
> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetzger@apache.org>
> wrote:
> > Hi Gyula,
> >
> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
> > "release-0.10" branch to Apache's maven snapshot repository.
> >
> >
> > I don't think Mihail's code will run when he's compiling it against
> > 1.0-SNAPSHOT.
> >
> >
> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.fora@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >> I think Robert meant to write setting the connector dependency to
> >> 1.0-SNAPSHOT.
> >>
> >> Cheers,
> >> Gyula
> >>
> >> Robert Metzger <rmetzger@apache.org> ezt írta (időpont: 2015. dec. 1.,
> K,
> >> 17:10):
> >>>
> >>> Hi Mihail,
> >>>
> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this
> as
> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
> >>>
> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain
> a
> >>> fix.
> >>>
> >>> Since the kafka connector is not contained in the flink binary, you can
> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven
> will
> >>> then download the code planned for the 0.10-SNAPSHOT release.
> >>>
> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail <mihail.vieru@zalando.de
> >
> >>> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> we get the following NullPointerException after ~50 minutes when
> running
> >>>> a streaming job with windowing and state that reads data from Kafka
> and
> >>>> writes the result to local FS.
> >>>> There are around 170 million messages to be processed, Flink 0.10.1
> >>>> stops at ~8 million.
> >>>> Flink runs locally, started with the "start-cluster-streaming.sh"
> >>>> script.
> >>>>
> >>>> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1)
> switched
> >>>> to SCHEDULED
> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1)
> switched
> >>>> to DEPLOYING
> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
to
> >>>> SCHEDULED
> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
to
> >>>> DEPLOYING
> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1)
> switched
> >>>> to RUNNING
> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
to
> >>>> RUNNING
> >>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
to
> >>>> CANCELED
> >>>> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1)
> switched
> >>>> to FAILED
> >>>> java.lang.Exception
> >>>>     at
> >>>>
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> >>>>     at
> >>>>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
> >>>>     at
> >>>>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> >>>>     at
> >>>>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
> >>>>     at
> >>>>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> >>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> >>>>     at java.lang.Thread.run(Thread.java:745)
> >>>> Caused by: java.lang.NullPointerException
> >>>>     at
> >>>>
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
> >>>>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
> >>>>     at
> >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >>>>     at
> >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> >>>>     at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
> >>>>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
> >>>>     at
> >>>>
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
> >>>>     at
> >>>>
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
> >>>>     at
> >>>>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
> >>>>
> >>>>
> >>>> Any ideas on what could cause this behaviour?
> >>>>
> >>>> Best,
> >>>> Mihail
> >>>
> >>>
> >
>

Mime
View raw message