Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A7940200D11 for ; Mon, 2 Oct 2017 11:57:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A60431609EF; Mon, 2 Oct 2017 09:57:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 024A21609DE for ; Mon, 2 Oct 2017 11:57:40 +0200 (CEST) Received: (qmail 28669 invoked by uid 500); 2 Oct 2017 09:57:39 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 28658 invoked by uid 99); 2 Oct 2017 09:57:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Oct 2017 09:57:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 51D2F193FFB for ; Mon, 2 Oct 2017 09:57:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.642 X-Spam-Level: *** X-Spam-Status: No, score=3.642 tagged_above=-999 required=6.31 tests=[HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=2, HTML_OBFUSCATE_10_20=1.162, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id mpTRDWnVytzV for ; Mon, 2 Oct 2017 09:57:34 +0000 (UTC) Received: from mail-vk0-f42.google.com (mail-vk0-f42.google.com [209.85.213.42]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 66AFA5FCF2 for ; Mon, 2 Oct 2017 09:57:33 +0000 (UTC) Received: by mail-vk0-f42.google.com with SMTP id k191so599823vke.7 for ; Mon, 02 Oct 2017 02:57:33 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=gJCPFcM93+ghTHvLti3J1iLSG+ktzYL+7dMUTA/tq3w=; b=QE34lbXrZnh/PVVK4N+9FnqN/H0+JWfQ4wkbgc5C54ELlfmWBdhn4hg2UI3FtNz/qN 9jLri9Bo+aPXVPjZ2Ai9VvjZrOWnKINbuweBF+biRgyKQLDn5sLlBg3A2TOsLARxtq4O pzX9ndKtbxwsW/KU2acVLudOtg2xYQJNvKUBZ/tOvJfhhDdVwp5NHEJvWTc1G2llO11S gy60uEywYmY0dfbXveV2FmIX2cZY/PKEwxW2obGdlofPVochy/XD4eQb/co9wiUmIvSe 31VF7JM+HYGHXzLSCBhkq322JFvhao62oo6FEEkEQd0V9Xj8rOdo7yNYjUPsOESqAqTH Suxg== X-Gm-Message-State: AHPjjUgWcDunoTlDPcFdnvKfwnYnapvA95C1oNlfttKgalDfDGMnj+aw DAvGHYTxFgQCIGWe4f7Kwq1JotUn X-Received: by 10.31.155.139 with SMTP id d133mr8122216vke.94.1506938252196; Mon, 02 Oct 2017 02:57:32 -0700 (PDT) Received: from mail-ua0-f178.google.com (mail-ua0-f178.google.com. [209.85.217.178]) by smtp.gmail.com with ESMTPSA id c20sm2436890vke.7.2017.10.02.02.57.31 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Mon, 02 Oct 2017 02:57:31 -0700 (PDT) Received: by mail-ua0-f178.google.com with SMTP id q29so2907066uaf.3 for ; Mon, 02 Oct 2017 02:57:31 -0700 (PDT) X-Google-Smtp-Source: AOwi7QAgFelbeH2+VHxWiLqoptBdFu4ya3LRj5Zc2264qyP8ZBW/fofcgJQD7Pj4q1yvwQ0p+L5gEFqEJA7BsktfF1I= X-Received: by 10.176.72.66 with SMTP id c2mr8723542uad.72.1506938251555; Mon, 02 Oct 2017 02:57:31 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.116.73 with HTTP; Mon, 2 Oct 2017 02:57:30 -0700 (PDT) In-Reply-To: References: From: "Federico D'Ambrosio" Date: Mon, 2 Oct 2017 11:57:30 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state To: "Tzu-Li (Gordon) Tai" Cc: user , Aljoscha Krettek , Stefan Richter Content-Type: multipart/alternative; boundary="001a1145aa1a2edee6055a8d671d" archived-at: Mon, 02 Oct 2017 09:57:42 -0000 --001a1145aa1a2edee6055a8d671d Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable As a followup: the flink job has currently an uptime of almost 24 hours, with no checkpoint failed or restart whereas, with async snapshots, it would have already crashed 50 or so times. Regards, Federico 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio < federico.dambrosio@smartlab.ws>: > Thank you very much, Gordon. > > I'll try to run the job without the asynchronous snapshots first thing. > > As for the Event data type: it's a case class with 2 fields: a String ID > and a composite case class (let's call it RealEvent) containing 3 fields = of > the following types: Information, which is a case class with String field= s, > Coordinates, a nested case class with 2 Double and InstantValues, with 3 > Integers and a DateTime.This DateTime field in InstantValues is the one > being evalued in the maxBy (via InstantValues and RealEvent compareTo > implementations, because dot notation is not working in scala as of 1.3.2= , > FLINK-7629 ) and that > was the reason in the first place I had to register the > JodaDateTimeSerializer with Kryo. > > Regards, > Federico > > > > > 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai : > >> Hi, >> >> Thanks for the extra info, it was helpful (I=E2=80=99m not sure why your= first >> logs didn=E2=80=99t have the full trace, though). >> >> I spent some time digging through the error trace, and currently have >> some observations I would like to go through first: >> >> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while >> trying to access the state and making a copy (via serialization) in the >> CopyOnWriteStateTable. >> 2. The state that caused the exception seems to be the state of the >> reducing window function (i.e. the maxBy). The state type should be the >> same as the records in your `events` DataStream, which seems to be a Sca= la >> case class with some nested field that requires Kryo for serialization. >> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when >> trying to copy that field .. >> >> My current guess would perhaps be that the serializer internally used ma= y >> have been incorrectly shared, which is probably why this exception happe= ns >> randomly for you. >> I recall that there were similar issues that occurred before due to the >> fact that some KryoSerializers aren't thread-safe and was incorrectly >> shared in Flink. >> >> I may need some help from you to be able to look at this a bit more: >> - Is it possible that you disable asynchronous snapshots and try running >> this job a bit more to see if the problem still occurs? This is mainly t= o >> eliminate my guess on whether or not there is some incorrect serializer >> usage in the CopyOnWriteStateTable. >> - Could you let us know what your `events` DataStream records type case >> class looks like? >> >> Also looping in Aljoscha and Stefan here, as they would probably have >> more insights in this. >> >> Cheers, >> Gordon >> >> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio ( >> federico.dambrosio@smartlab.ws) wrote: >> >> Hi Gordon, >> >> I remembered that I had already seen this kind of exception once during >> the testing of the current job and fortunately I had the complete >> stacktrace still saved on my pc: >> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 >> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157= ) >> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali >> zer.copy(KryoSerializer.java:176) >> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop >> y(CaseClassSerializer.scala:101) >> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop >> y(CaseClassSerializer.scala:32) >> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop >> y(CaseClassSerializer.scala:101) >> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop >> y(CaseClassSerializer.scala:32) >> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable. >> get(CopyOnWriteStateTable.java:279) >> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable. >> get(CopyOnWriteStateTable.java:296) >> at org.apache.flink.runtime.state.heap.HeapReducingState.get( >> HeapReducingState.java:68) >> at org.apache.flink.streaming.runtime.operators.windowing.Windo >> wOperator.onEventTime(WindowOperator.java:498) >> at org.apache.flink.streaming.api.operators.HeapInternalTimerSe >> rvice.advanceWatermark(HeapInternalTimerService.java:275) >> at org.apache.flink.streaming.api.operators.InternalTimeService >> Manager.advanceWatermark(InternalTimeServiceManager.java:107) >> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor.processWatermark(AbstractStreamOperator.java:946) >> at org.apache.flink.streaming.runtime.io.StreamInputProcessor$F >> orwardingValveOutputHandler.handleWatermark(StreamInputProce >> ssor.java:286) >> at org.apache.flink.streaming.runtime.io.StreamInputProcessor$F >> orwardingValveOutputHandler.handleWatermark(StreamInputProce >> ssor.java:289) >> at org.apache.flink.streaming.runtime.streamstatus.StatusWaterm >> arkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(S >> tatusWatermarkValve.java:173) >> at org.apache.flink.streaming.runtime.streamstatus.StatusWaterm >> arkValve.inputWatermark(StatusWatermarkValve.java:108) >> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p >> rocessInput(StreamInputProcessor.java:188) >> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask. >> run(OneInputStreamTask.java:69) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:263) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> >> I don't know why now the stacktrace is getting output only for the first >> parts (handleWatermark and HeapReducingState). >> >> So, it looks like something that has to do with the KryoSerializer. As a >> KryoSerializer I'm using JodaDateTimeSerializer, registered as follows: >> >> env.getConfig.addDefaultKryoSerializer(classOf[DateTime], >> classOf[JodaDateTimeSerializer]) >> >> I hope this could help. >> >> Regards, >> Federico >> >> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio < >> federico.dambrosio@smartlab.ws>: >> >>> Hi Gordon, >>> >>> I'm currently using Flink 1.3.2 in local mode. >>> >>> If it's any help I realized from the log that the complete task which i= s >>> failing is: >>> >>> 2017-09-29 14:17:20,354 INFO org.apache.flink.runtime.taskm >>> anager.Task - latest_time -> (map_active_stream, >>> map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched >>> from RUNNING to FAILED. >>> >>> val events =3D keyedStreamByID >>> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >>> .maxBy("time").name("latest_time").uid("latest_time") >>> >>> >>> val activeStream =3D events >>> //Serialization to JsValue >>> .map(event =3D> event.toMongoActiveJsValue).na >>> me("map_active_stream").uid("map_active_stream") >>> //Global windowing, the cause of exception should be above >>> .timeWindowAll(Time.seconds(10)) >>> .apply(new MongoWindow(MongoWritingType.U >>> PDATE)).name("active_stream_window").uid("active_stream_window") >>> >>> val historyStream =3D airtrafficEvents >>> //Serialization to JsValue >>> .map(event =3D> event.toMongoHistoryJsValue).n >>> ame("map_history_stream").uid("map_history_stream") >>> //Global windowing, the cause of exception should be above >>> .timeWindowAll(Time.seconds(10)) >>> .apply(new MongoWindow(MongoWritingType.U >>> PDATE)).name("history_stream_window").uid("history_stream_window") >>> >>> >>> >>> Regards, >>> Federico >>> >>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai : >>> >>>> Hi, >>>> >>>> I=E2=80=99m looking into this. Could you let us know the Flink version= in which >>>> the exceptions occurred? >>>> >>>> Cheers, >>>> Gordon >>>> >>>> >>>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio ( >>>> federico.dambrosio@smartlab.ws) wrote: >>>> >>>> Hi, I'm coming across these Exceptions while running a pretty simple f= link job. >>>> >>>> First one: >>>> java.lang.RuntimeException: Exception occurred while processing valve = output watermark: >>>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor$= ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) >>>> at org.apache.flink.streaming.runtime.streamstatus.StatusWater= markValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermark= Valve.java:173) >>>> at org.apache.flink.streaming.runtime.streamstatus.StatusWater= markValve.inputWatermark(StatusWatermarkValve.java:108) >>>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.= processInput(StreamInputProcessor.java:188) >>>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask= .run(OneInputStreamTask.java:69) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(= StreamTask.java:263) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702= ) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.lang.ArrayIndexOutOfBoundsException >>>> >>>> The second one: >>>> java.io.IOException: Exception while applying ReduceFunction in reduci= ng state >>>> at org.apache.flink.runtime.state.heap.HeapReducingState.add(H= eapReducingState.java:82) >>>> at org.apache.flink.streaming.runtime.operators.windowing.Wind= owOperator.processElement(WindowOperator.java:442) >>>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.= processInput(StreamInputProcessor.java:206) >>>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask= .run(OneInputStreamTask.java:69) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(= StreamTask.java:263) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702= ) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.lang.ArrayIndexOutOfBoundsException >>>> >>>> >>>> Since it looks like something is wrong in Watermark processing, in my = case Watermarks are generated in my KafkaSource: >>>> >>>> val stream =3D env.addSource( >>>> new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchem= a(), consumerConfig) >>>> .setStartFromLatest() >>>> .assignTimestampsAndWatermarks( >>>> new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(= 10)) { >>>> def extractTimestamp(element: AirTrafficEvent): Long =3D >>>> element.instantValues.time.getMillis >>>> }) >>>> ) >>>> >>>> These exceptions aren't really that informative per se and, from what = I >>>> see, the task triggering these exceptions is the following operator: >>>> >>>> val events =3D keyedStreamByID >>>> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >>>> .maxBy("timestamp").name("latest_time").uid("latest_time") >>>> >>>> What could be the problem here in your opinion? It's not emitting >>>> watermarks correctly? I'm not even how I could reproduce this exceptio= ns, >>>> since it looks like they happen pretty much randomly. >>>> >>>> Thank you all, >>>> Federico D'Ambrosio >>>> >>>> >>> >>> >>> -- >>> Federico D'Ambrosio >>> >> >> >> >> -- >> Federico D'Ambrosio >> >> > > > -- > Federico D'Ambrosio > --=20 Federico D'Ambrosio --001a1145aa1a2edee6055a8d671d Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
As a followup:

the flink job= has currently an uptime of almost 24 hours, with no checkpoint failed or r= estart whereas, with async snapshots, it would have already crashed 50 or s= o times.

Regards,
Federico

2017-09-30 19:0= 1 GMT+02:00 Federico D'Ambrosio <federico.dambrosio@smart= lab.ws>:
<= div>Thank you very much, Gordon.

I'll try to r= un the job without the asynchronous snapshots first thing.

As for the Event data type: it's a case class with 2 fields: a= String ID and a composite case class (let's call it RealEvent) contain= ing 3 fields of the following types: Information, which is a case class wit= h String fields, Coordinates, a nested case class with 2 Double and Instant= Values, with 3 Integers and a DateTime.This DateTime field in InstantValues= is the one being evalued in the maxBy (via InstantValues and RealEvent com= pareTo implementations, because dot notation is not working in scala as of = 1.3.2, FLINK-7629) and that was the reason in the first place I ha= d to register the JodaDateTimeSerializer with Kryo.

Regards,
Federico




2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@apache.org>:
Hi,

Thanks for the extra info, = it was helpful (I=E2=80=99m not sure why your first logs didn=E2=80=99t hav= e the full trace, though).

I spent some time digging through the error trace, and cur= rently have some observations I would like to go through first:

1. So it seems like the= ArrayIndexOutOfBoundsException was thrown while trying to access the state= and making a copy (via serialization) in the CopyOnWriteStateTable.
<= div id=3D"m_7954462643934250134m_6299196144286550409bloop_customfont" style= =3D"font-family:Helvetica,Arial;font-size:13px;color:rgba(0,0,0,1.0);margin= :0px;line-height:auto">2. The state that caused the exception seems to be t= he state of the reducing window function (i.e. the maxBy). The state type s= hould be the same as the records in your `events` DataStream, which seems t= o be a Scala case class with some nested field that requires Kryo for seria= lization.
3. Somehow Kryo failed with the Arra= yIndexOutOfBoundsException when trying to copy that field ..

My current guess would p= erhaps be that the serializer internally used may have been incorrectly sha= red, which is probably why this exception happens randomly for you.
I recall that there were similar issues that occurre= d before due to the fact that some KryoSerializers aren't thread-safe a= nd was incorrectly shared in Flink.

=
I may need some help from you to be able to look at= this a bit more:
- Is it possible that you di= sable asynchronous snapshots and try running this job a bit more to see if = the problem still occurs? This is mainly to eliminate my guess on whether o= r not there is some incorrect serializer usage in the CopyOnWriteStateTable= .
- Could you let us know what your `events` D= ataStream records type case class looks like?
=
Also looping in Aljoscha and Stefan here,= as they would probably have more insights in this.

Cheers,

On 30 September= 2017 at 10:56:33 AM, Federico D'Ambrosio (federico.dambrosio@smartlab.ws= ) wrote:

Hi Gordon,

I remembered that I had already seen this kind of exception once during the testing of the current job and fortunately I had the complete stacktrace still saved on my pc:

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.c= opy(KryoSerializer.java:176)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(Cas= eClassSerializer.scala:101)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(Cas= eClassSerializer.scala:32)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(Cas= eClassSerializer.scala:101)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(Cas= eClassSerializer.scala:32)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(Cop= yOnWriteStateTable.java:279)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(Cop= yOnWriteStateTable.java:296)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapRed= ucingState.java:68)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.runtime.operators.windowing.WindowOper= ator.onEventTime(WindowOperator.java:498)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.api.operators.HeapInternalTimerService= .advanceWatermark(HeapInternalTimerService.java:275)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.api.operators.InternalTimeServiceManag= er.advanceWatermark(InternalTimeServiceManager.java:107)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.api.operators.AbstractStreamOperator.p= rocessWatermark(AbstractStreamOperator.java:946)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.= runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.= handleWatermark(StreamInputProcessor.java:286)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.= runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.= handleWatermark(StreamInputProcessor.java:289)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkVa= lve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWater= markValve.java:173)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkVa= lve.inputWatermark(StatusWatermarkValve.java:108)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.= runtime.io.StreamInputProcessor.processInput(StreamInputProce= ssor.java:188)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(O= neInputStreamTask.java:69)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream= Task.java:263)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at java.lang.Thread.run(Thread.java:748)

I don't know why now the stacktrace is getting output only for the first parts (handleWatermark and HeapReducingState).

So, it looks like something that has to do with the KryoSerializer. As a KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:

env.getConfig.addDefaultKryoSerializer(classOf[DateTime], classOf[JodaDateTimeSerializer])

I hope this could help.

Regards,=
Federico=

2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <federico.dambrosio@smartlab.ws>:
Hi Gordon,

I'm currently using Flink 1.3.2 in local mode.

If it's any help I realized from the log that the complete task which is failing is:

2017-09-29 14:17:20,354 INFO=C2=A0 org.apache.flink.runtime.taskmanager.Task=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 - latest_time -> (map_active_stream, map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED.

val events =3D keyedStreamByID=C2=A0
=C2=A0 .window(TumblingEventTimeWindows.of(Time.seconds(20)))
=C2=A0 .maxBy("time").name("latest_time").uid("lates= t_time")


val activeStream =3D events
=C2=A0 //Serialization to JsValue
=C2=A0 .map(event =3D> event.toMongoActiveJsValue).name("map_active_stream").uid(&q= uot;map_active_stream")
=C2=A0 //Global windowing, the cause of exception should be above
=C2=A0 .timeWindowAll(Time.seconds(10))
=C2=A0 .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_wi= ndow").uid("active_stream_window")

val historyStream =3D airtrafficEvents

=C2=A0 //Serialization to JsValue
=C2=A0 .map(event =3D> event.toMongoHistoryJsValue).name("map_history_stream").uid(= "map_history_stream")
=C2=A0 //Global windowing, the cause of exception should be above
=C2=A0 .timeWindowAll(Time.seconds(10))
=C2=A0 .apply(new MongoWindow(MongoWritingType.UPDATE)).name("history_stream_w= indow").uid("history_stream_window")




Regards,
Federico

2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@apache.org>:
Hi,

I=E2=80=99m looking into this. Could you let us know the Flink version in which the exceptions occurred?

Cheers,
Gordon


On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (federico.dambrosio@smart= lab.ws) wrote:

Hi, I'm coming across the=
se Exceptions while running a pretty simple flink job.
First one:
java.lang.Runt= imeException: Exception occurred while processing valve output watermark: = =20 at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOu= tputHandler.handleWatermark(StreamInputProcessor.java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWater= markValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWater= markValve.inputWatermark(StatusWatermarkValve.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Stre= amInputProcessor.java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask= .run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(= StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702= ) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException

The second one:
java.io.I= OException: Exception while applying ReduceFunction in reducing state
= at org.apache.flink.runtime.state.heap.HeapReducingState.add(He<= wbr>apReducingState.java:82)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowO= perator.java:442)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p<= wbr>rocessInput(StreamInputProcessor.java:206)
at org.apach= e.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputSt= reamTask.java:69)
at org.apache.flink.streaming.runtim= e.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apach= e.flink.runtime.taskmanager.Task.run(Task.java:702)
at java= .lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOu= tOfBoundsException


Since it looks like something is wr=
ong in Watermark processing, in my case Watermarks are generated in my Kafk=
aSource:

val stream =3D env.addSource(
new FlinkKafkaCon= sumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig= )
.setStartFromLatest()
.assignTimestampsAndWatermarks(<= br> new BoundedOutOfOrdernessTimestampExtractor[Event](Time.secon= ds(10)) {
def extractTimestamp(element: AirTrafficEvent): L= ong =3D
element.instantValues.time.getMillis
})<= br>)
These exceptions aren't really that informative per se and, from what I see, the task triggering these exceptions is the following operator:

val events =3D keyedStreamByID
=C2=A0 .window(TumblingEventTimeWindows.of(Time.seconds(20)))
=C2=A0 .maxBy("timestamp").name("latest_time").uid("= latest_time")

What could be the problem here in your opinion? It's not emitting watermarks correctly? I'm not even how I could reproduce this exceptions, since it looks like they happen pretty much randomly.
=

Thank you all,
Federico D'Ambrosio



--
Federico D'Ambrosio



--
Federico D'Ambrosio


--
Federico D'Ambrosio<= /div>



--
Fede= rico D'Ambrosio
--001a1145aa1a2edee6055a8d671d--