Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BAB2617C0B for ; Wed, 2 Dec 2015 09:18:39 +0000 (UTC) Received: (qmail 4805 invoked by uid 500); 2 Dec 2015 09:18:39 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 4728 invoked by uid 500); 2 Dec 2015 09:18:39 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 4718 invoked by uid 99); 2 Dec 2015 09:18:39 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Dec 2015 09:18:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 0AFE8C128F for ; Wed, 2 Dec 2015 09:18:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.98 X-Spam-Level: ** X-Spam-Status: No, score=2.98 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=zalando-de.20150623.gappssmtp.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id DDmahd0A3egD for ; Wed, 2 Dec 2015 09:18:28 +0000 (UTC) Received: from mail-wm0-f53.google.com (mail-wm0-f53.google.com [74.125.82.53]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 8DC8A42AC6 for ; Wed, 2 Dec 2015 09:18:27 +0000 (UTC) Received: by wmuu63 with SMTP id u63so206082717wmu.0 for ; Wed, 02 Dec 2015 01:18:26 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=zalando-de.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=IvHK1voeXKApBFg3EICWuoOJJqvrmAc1kL6cG+SlKyI=; b=UwsqTzhXNfgmeiOYdsYsxMBw3z7BflPjn5DWPf0XvKzWQeD4Y7l86ZRA+vs/tZBV6U AfKQ5eqIWDrN9BGvQ3n30T11BnahWgKT4me1IkDNIuyehJfnP52rtJoC/llTPCtqWlNK /cGCpkM7BAo7wmUspaAqGeiDaOM1jW0ISjDwquvgQOYNM0oagOVHZjVut/R5E4djdXLa xXPjVjUAfoBdtaSJxwcy6i0208yKH4jEU1MIPRYcllLBmUK3Wph+ZIy4q8KMJ9Q93cRJ KI5zZwCjoEy+RAUPufflE5GaklK+3o4vKHIXK5FBCVtiQCOhRbtZ7SYxtlOfJrepv7a9 5ztw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=IvHK1voeXKApBFg3EICWuoOJJqvrmAc1kL6cG+SlKyI=; b=CxHEpAssryk73HOBi2HJchWgrd3a/mOTwJCnKb8Dm+QiGg+2hF4IWrLgM734I/IHu/ k6j+vduBHjVbCR84TqcJBwXp193TUyDbAnJFqQD9NcEIGs2157Q5k/G5GM1DiZsUs0a/ UHuqS2MRDx29FXjYs7YtGJP2MtH3RKubI27eSWLZvsmEShNFwWWMh6NRFA9/b0tHjxxJ aVGuBpfzgieuNndqrhWVWD0Phj9Hg7/ESp2En7zMltsPeobso1iQs36mFvcil/Px9YcU DTNGwGKsuAkDtvw7A4qJ5XIzJyAc9JBL9VFJGwf1jN5j2jpjZVkae4sa9MwiF9zoEGR0 t4kA== X-Gm-Message-State: ALoCoQlTf6gFJ7YPWxOrtdJcPmg1vmFXlBOS5Q7b8rNOoSZ/hzP/G4dcY+Zj0Tx4nRpjPlVp6EeR MIME-Version: 1.0 X-Received: by 10.194.52.3 with SMTP id p3mr3538783wjo.2.1449047906705; Wed, 02 Dec 2015 01:18:26 -0800 (PST) Received: by 10.194.175.4 with HTTP; Wed, 2 Dec 2015 01:18:26 -0800 (PST) In-Reply-To: References: Date: Wed, 2 Dec 2015 10:18:26 +0100 Message-ID: Subject: Re: NPE with Flink Streaming from Kafka From: "Vieru, Mihail" To: user@flink.apache.org Content-Type: multipart/alternative; boundary=047d7b8744d8be26140525e6c1fd --047d7b8744d8be26140525e6c1fd Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Thank you, Robert! The issue with Kafka is now solved with the 0.10-SNAPSHOT dependency. We have run into an OutOfMemory exception though, which appears to be related to the state. As my colleague, Javier Lopez, mentioned in a previous thread, state handling is crucial for our use case. And as the jobs are intended to run for months, stability plays an important role in choosing a stream processing framework. 12/02/2015 10:03:53 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to FAILED java.lang.OutOfMemoryError: Java heap space at java.util.HashMap.resize(HashMap.java:703) at java.util.HashMap.putVal(HashMap.java:662) at java.util.HashMap.put(HashMap.java:611) at org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvSta= te.java:98) at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPer= Order.java:121) at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPer= Order.java:108) at org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregat= e(KeyMap.java:196) at org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTime= Panes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50) at org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProce= ssingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowO= perator.java:210) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Str= eamInputProcessor.java:166) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStr= eamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:= 218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) 2015-12-01 17:42 GMT+01:00 Maximilian Michels : > Thanks! I've linked the issue in JIRA. > > On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger > wrote: > > I think its this one https://issues.apache.org/jira/browse/KAFKA-824 > > > > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels > wrote: > >> > >> I know this has been fixed already but, out of curiosity, could you > >> point me to the Kafka JIRA issue for this > >> bug? From the Flink issue it looks like this is a Zookeeper version > >> mismatch. > >> > >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger > >> wrote: > >> > Hi Gyula, > >> > > >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the > >> > "release-0.10" branch to Apache's maven snapshot repository. > >> > > >> > > >> > I don't think Mihail's code will run when he's compiling it against > >> > 1.0-SNAPSHOT. > >> > > >> > > >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula F=C3=B3ra > wrote: > >> >> > >> >> Hi, > >> >> > >> >> I think Robert meant to write setting the connector dependency to > >> >> 1.0-SNAPSHOT. > >> >> > >> >> Cheers, > >> >> Gyula > >> >> > >> >> Robert Metzger ezt =C3=ADrta (id=C5=91pont: 2= 015. dec. > 1., > >> >> K, > >> >> 17:10): > >> >>> > >> >>> Hi Mihail, > >> >>> > >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for > this > >> >>> as > >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067 > >> >>> > >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will > contain > >> >>> a > >> >>> fix. > >> >>> > >> >>> Since the kafka connector is not contained in the flink binary, yo= u > >> >>> can > >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Mave= n > >> >>> will > >> >>> then download the code planned for the 0.10-SNAPSHOT release. > >> >>> > >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail > >> >>> > >> >>> wrote: > >> >>>> > >> >>>> Hi, > >> >>>> > >> >>>> we get the following NullPointerException after ~50 minutes when > >> >>>> running > >> >>>> a streaming job with windowing and state that reads data from Kaf= ka > >> >>>> and > >> >>>> writes the result to local FS. > >> >>>> There are around 170 million messages to be processed, Flink 0.10= .1 > >> >>>> stops at ~8 million. > >> >>>> Flink runs locally, started with the "start-cluster-streaming.sh" > >> >>>> script. > >> >>>> > >> >>>> 12/01/2015 15:06:24 Job execution switched to status RUNNING. > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) > >> >>>> switched > >> >>>> to SCHEDULED > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) > >> >>>> switched > >> >>>> to DEPLOYING > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce a= t > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switche= d > to > >> >>>> SCHEDULED > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce a= t > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switche= d > to > >> >>>> DEPLOYING > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) > >> >>>> switched > >> >>>> to RUNNING > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce a= t > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switche= d > to > >> >>>> RUNNING > >> >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce a= t > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switche= d > to > >> >>>> CANCELED > >> >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) > >> >>>> switched > >> >>>> to FAILED > >> >>>> java.lang.Exception > >> >>>> at > >> >>>> > >> >>>> > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(L= egacyFetcher.java:242) > >> >>>> at > >> >>>> > >> >>>> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkK= afkaConsumer.java:397) > >> >>>> at > >> >>>> > >> >>>> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.ja= va:58) > >> >>>> at > >> >>>> > >> >>>> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStrea= mTask.java:55) > >> >>>> at > >> >>>> > >> >>>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.jav= a:218) > >> >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:58= 4) > >> >>>> at java.lang.Thread.run(Thread.java:745) > >> >>>> Caused by: java.lang.NullPointerException > >> >>>> at > >> >>>> > >> >>>> > org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:11= 5) > >> >>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) > >> >>>> at > >> >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:67= 5) > >> >>>> at > >> >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:81= 3) > >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) > >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > >> >>>> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:33= 2) > >> >>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) > >> >>>> at > >> >>>> > >> >>>> > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHand= ler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) > >> >>>> at > >> >>>> > >> >>>> > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHand= ler.commit(ZookeeperOffsetHandler.java:80) > >> >>>> at > >> >>>> > >> >>>> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOf= fsetCommitter.run(FlinkKafkaConsumer.java:632) > >> >>>> > >> >>>> > >> >>>> Any ideas on what could cause this behaviour? > >> >>>> > >> >>>> Best, > >> >>>> Mihail > >> >>> > >> >>> > >> > > > > > > --047d7b8744d8be26140525e6c1fd Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Thank you, Robert! The issue with Kafka is now solved= with the 0.10-SNAPSHOT dependency.

We have run into an OutOfM= emory exception though, which appears to be related to the state. As my col= league, Javier Lopez, mentioned in a previous thread, state handling is cru= cial for our use case. And as the jobs are intended to run for months, stab= ility plays an important role in choosing a stream processing framework.
12/02/2015 10:03:53=C2=A0=C2=A0=C2=A0 Fast TumblingTimeWindows(5000) o= f Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) sw= itched to FAILED
java.lang.OutOfMemoryError: Java heap space
=C2=A0= =C2=A0=C2=A0 at java.util.HashMap.resize(HashMap.java:703)
=C2=A0=C2=A0= =C2=A0 at java.util.HashMap.putVal(HashMap.java:662)
=C2=A0=C2=A0=C2=A0 = at java.util.HashMap.put(HashMap.java:611)
=C2=A0=C2=A0=C2=A0 at org.apa= che.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java= :98)
=C2=A0=C2=A0=C2=A0 at de.zalando.saiki.gerakanflink.ItemPriceAvgPer= Order$3.reduce(ItemPriceAvgPerOrder.java:121)
=C2=A0=C2=A0=C2=A0 at de.z= alando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrde= r.java:108)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.runtime.ope= rators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
=C2=A0=C2=A0=C2= =A0 at org.apache.flink.streaming.runtime.operators.windowing.AggregatingKe= yedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
= =C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.runtime.operators.windowin= g.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAligne= dProcessingTimeWindowOperator.java:210)
=C2=A0=C2=A0=C2=A0 at org.apache= .flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputPr= ocessor.java:166)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.runti= me.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
=C2=A0=C2=A0= =C2=A0 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream= Task.java:218)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.taskmanage= r.Task.run(Task.java:584)
=C2=A0=C2=A0=C2=A0 at java.lang.Thread.run(Thr= ead.java:745)




2015-12-01 17:42 GMT+01:00 Maximilian Michels <mxm@apache= .org>:
Thanks! I've lin= ked the issue in JIRA.

On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetzger@apache.org> wrote:
> I think its this one https://issues.apache.org/j= ira/browse/KAFKA-824
>
> On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <mxm@apache.org> wrote:
>>
>> I know this has been fixed already but, out of curiosity, could yo= u
>> point me to the Kafka JIRA issue for this
>> bug? From the Flink issue it looks like this is a Zookeeper versio= n
>> mismatch.
>>
>> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetzger@apache.org>
>> wrote:
>> > Hi Gyula,
>> >
>> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions fr= om the
>> > "release-0.10" branch to Apache's maven snapsho= t repository.
>> >
>> >
>> > I don't think Mihail's code will run when he's co= mpiling it against
>> > 1.0-SNAPSHOT.
>> >
>> >
>> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula F=C3=B3ra <gyula.fora@gmail.com> wrote:
>> >>
>> >> Hi,
>> >>
>> >> I think Robert meant to write setting the connector depen= dency to
>> >> 1.0-SNAPSHOT.
>> >>
>> >> Cheers,
>> >> Gyula
>> >>
>> >> Robert Metzger <rmetzger@apache.org> ezt =C3=ADrta (id=C5=91pont: 2015. dec. 1., >> >> K,
>> >> 17:10):
>> >>>
>> >>> Hi Mihail,
>> >>>
>> >>> the issue is actually a bug in Kafka. We have a JIRA = in Flink for this
>> >>> as
>> >>> well: https://issues.apache.or= g/jira/browse/FLINK-3067
>> >>>
>> >>> Sadly, we haven't released a fix for it yet. Flin= k 0.10.2 will contain
>> >>> a
>> >>> fix.
>> >>>
>> >>> Since the kafka connector is not contained in the fli= nk binary, you
>> >>> can
>> >>> just set the version in your maven pom file to 0.10-S= NAPSHOT. Maven
>> >>> will
>> >>> then download the code planned for the 0.10-SNAPSHOT = release.
>> >>>
>> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
>> >>> <mihail= .vieru@zalando.de>
>> >>> wrote:
>> >>>>
>> >>>> Hi,
>> >>>>
>> >>>> we get the following NullPointerException after ~= 50 minutes when
>> >>>> running
>> >>>> a streaming job with windowing and state that rea= ds data from Kafka
>> >>>> and
>> >>>> writes the result to local FS.
>> >>>> There are around 170 million messages to be proce= ssed, Flink 0.10.1
>> >>>> stops at ~8 million.
>> >>>> Flink runs locally, started with the "start-= cluster-streaming.sh"
>> >>>> script.
>> >>>>
>> >>>> 12/01/2015 15:06:24=C2=A0 =C2=A0 Job execution sw= itched to status RUNNING.
>> >>>> 12/01/2015 15:06:24=C2=A0 =C2=A0 Source: Custom S= ource -> Map -> Map(1/1)
>> >>>> switched
>> >>>> to SCHEDULED
>> >>>> 12/01/2015 15:06:24=C2=A0 =C2=A0 Source: Custom S= ource -> Map -> Map(1/1)
>> >>>> switched
>> >>>> to DEPLOYING
>> >>>> 12/01/2015 15:06:24=C2=A0 =C2=A0 Fast TumblingTim= eWindows(5000) of Reduce at
>> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: U= nnamed(1/1) switched to
>> >>>> SCHEDULED
>> >>>> 12/01/2015 15:06:24=C2=A0 =C2=A0 Fast TumblingTim= eWindows(5000) of Reduce at
>> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: U= nnamed(1/1) switched to
>> >>>> DEPLOYING
>> >>>> 12/01/2015 15:06:24=C2=A0 =C2=A0 Source: Custom S= ource -> Map -> Map(1/1)
>> >>>> switched
>> >>>> to RUNNING
>> >>>> 12/01/2015 15:06:24=C2=A0 =C2=A0 Fast TumblingTim= eWindows(5000) of Reduce at
>> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: U= nnamed(1/1) switched to
>> >>>> RUNNING
>> >>>> 12/01/2015 15:56:08=C2=A0 =C2=A0 Fast TumblingTim= eWindows(5000) of Reduce at
>> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: U= nnamed(1/1) switched to
>> >>>> CANCELED
>> >>>> 12/01/2015 15:56:08=C2=A0 =C2=A0 Source: Custom S= ource -> Map -> Map(1/1)
>> >>>> switched
>> >>>> to FAILED
>> >>>> java.lang.Exception
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>>
>> >>>> org.apache.flink.streaming.connectors.kafka.inter= nals.LegacyFetcher.run(LegacyFetcher.java:242)
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>>
>> >>>> org.apache.flink.streaming.connectors.kafka.Flink= KafkaConsumer.run(FlinkKafkaConsumer.java:397)
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>>
>> >>>> org.apache.flink.streaming.api.operators.StreamSo= urce.run(StreamSource.java:58)
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>>
>> >>>> org.apache.flink.streaming.runtime.tasks.SourceSt= reamTask.run(SourceStreamTask.java:55)
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>>
>> >>>> org.apache.flink.streaming.runtime.tasks.StreamTa= sk.invoke(StreamTask.java:218)
>> >>>>=C2=A0 =C2=A0 =C2=A0at org.apache.flink.runtime.ta= skmanager.Task.run(Task.java:584)
>> >>>>=C2=A0 =C2=A0 =C2=A0at java.lang.Thread.run(Thread= .java:745)
>> >>>> Caused by: java.lang.NullPointerException
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>>
>> >>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnS= tat(ZkConnection.java:115)
>> >>>>=C2=A0 =C2=A0 =C2=A0at org.I0Itec.zkclient.ZkClien= t$10.call(ZkClient.java:817)
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(= ZkClient.java:675)
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(= ZkClient.java:813)
>> >>>>=C2=A0 =C2=A0 =C2=A0at org.I0Itec.zkclient.ZkClien= t.writeData(ZkClient.java:808)
>> >>>>=C2=A0 =C2=A0 =C2=A0at org.I0Itec.zkclient.ZkClien= t.writeData(ZkClient.java:777)
>> >>>>=C2=A0 =C2=A0 =C2=A0at kafka.utils.ZkUtils$.update= PersistentPath(ZkUtils.scala:332)
>> >>>>=C2=A0 =C2=A0 =C2=A0at kafka.utils.ZkUtils.updateP= ersistentPath(ZkUtils.scala)
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>>
>> >>>> org.apache.flink.streaming.connectors.kafka.inter= nals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.jav= a:112)
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>>
>> >>>> org.apache.flink.streaming.connectors.kafka.inter= nals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>> >>>>=C2=A0 =C2=A0 =C2=A0at
>> >>>>
>> >>>> org.apache.flink.streaming.connectors.kafka.Flink= KafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>> >>>>
>> >>>>
>> >>>> Any ideas on what could cause this behaviour?
>> >>>>
>> >>>> Best,
>> >>>> Mihail
>> >>>
>> >>>
>> >
>
>

--047d7b8744d8be26140525e6c1fd--