flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Federico D'Ambrosio" <federico.dambro...@smartlab.ws>
Subject Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state
Date Thu, 12 Oct 2017 16:22:14 GMT
Hi Aljoscha,

yes, just like you're guessing, without asynchronous checkpoints, there has
been no crash so far.

Regards,
Federico

2017-10-12 18:08 GMT+02:00 Aljoscha Krettek <aljoscha@apache.org>:

> Hi Federico,
>
> I'm guessing the job is still working without asynchronous watermarks? I'm
> very eager to figure out what is actually going wrong with asynchronous
> checkpoints.
>
> Best,
> Aljoscha
>
>
> On 2. Oct 2017, at 11:57, Federico D'Ambrosio <
> federico.dambrosio@smartlab.ws> wrote:
>
> As a followup:
>
> the flink job has currently an uptime of almost 24 hours, with no
> checkpoint failed or restart whereas, with async snapshots, it would have
> already crashed 50 or so times.
>
> Regards,
> Federico
>
> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio <
> federico.dambrosio@smartlab.ws>:
>
>> Thank you very much, Gordon.
>>
>> I'll try to run the job without the asynchronous snapshots first thing.
>>
>> As for the Event data type: it's a case class with 2 fields: a String ID
>> and a composite case class (let's call it RealEvent) containing 3 fields of
>> the following types: Information, which is a case class with String fields,
>> Coordinates, a nested case class with 2 Double and InstantValues, with 3
>> Integers and a DateTime.This DateTime field in InstantValues is the one
>> being evalued in the maxBy (via InstantValues and RealEvent compareTo
>> implementations, because dot notation is not working in scala as of 1.3.2,
>> FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that
>> was the reason in the first place I had to register the
>> JodaDateTimeSerializer with Kryo.
>>
>> Regards,
>> Federico
>>
>>
>>
>>
>> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@apache.org>:
>>
>>> Hi,
>>>
>>> Thanks for the extra info, it was helpful (I’m not sure why your first
>>> logs didn’t have the full trace, though).
>>>
>>> I spent some time digging through the error trace, and currently have
>>> some observations I would like to go through first:
>>>
>>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while
>>> trying to access the state and making a copy (via serialization) in the
>>> CopyOnWriteStateTable.
>>> 2. The state that caused the exception seems to be the state of the
>>> reducing window function (i.e. the maxBy). The state type should be the
>>> same as the records in your `events` DataStream, which seems to be a Scala
>>> case class with some nested field that requires Kryo for serialization.
>>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when
>>> trying to copy that field ..
>>>
>>> My current guess would perhaps be that the serializer internally used
>>> may have been incorrectly shared, which is probably why this exception
>>> happens randomly for you.
>>> I recall that there were similar issues that occurred before due to the
>>> fact that some KryoSerializers aren't thread-safe and was incorrectly
>>> shared in Flink.
>>>
>>> I may need some help from you to be able to look at this a bit more:
>>> - Is it possible that you disable asynchronous snapshots and try running
>>> this job a bit more to see if the problem still occurs? This is mainly to
>>> eliminate my guess on whether or not there is some incorrect serializer
>>> usage in the CopyOnWriteStateTable.
>>> - Could you let us know what your `events` DataStream records type case
>>> class looks like?
>>>
>>> Also looping in Aljoscha and Stefan here, as they would probably have
>>> more insights in this.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (
>>> federico.dambrosio@smartlab.ws) wrote:
>>>
>>> Hi Gordon,
>>>
>>> I remembered that I had already seen this kind of exception once during
>>> the testing of the current job and fortunately I had the complete
>>> stacktrace still saved on my pc:
>>>
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:15
>>> 7)
>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>> zer.copy(KryoSerializer.java:176)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:101)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:32)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:101)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:32)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge
>>> t(CopyOnWriteStateTable.java:279)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge
>>> t(CopyOnWriteStateTable.java:296)
>>>         at org.apache.flink.runtime.state.heap.HeapReducingState.get(He
>>> apReducingState.java:68)
>>>         at org.apache.flink.streaming.runtime.operators.windowing.Windo
>>> wOperator.onEventTime(WindowOperator.java:498)
>>>         at org.apache.flink.streaming.api.operators.HeapInternalTimerSe
>>> rvice.advanceWatermark(HeapInternalTimerService.java:275)
>>>         at org.apache.flink.streaming.api.operators.InternalTimeService
>>> Manager.advanceWatermark(InternalTimeServiceManager.java:107)
>>>         at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor.processWatermark(AbstractStreamOperator.java:946)
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$F
>>> orwardingValveOutputHandler.handleWatermark(StreamInputProce
>>> ssor.java:286)
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$F
>>> orwardingValveOutputHandler.handleWatermark(StreamInputProce
>>> ssor.java:289)
>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWaterm
>>> arkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(S
>>> tatusWatermarkValve.java:173)
>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWaterm
>>> arkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>>> rocessInput(StreamInputProcessor.java:188)
>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>>> run(OneInputStreamTask.java:69)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:263)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>         at java.lang.Thread.run(Thread.java:748)
>>>
>>> I don't know why now the stacktrace is getting output only for the first
>>> parts (handleWatermark and HeapReducingState).
>>>
>>> So, it looks like something that has to do with the KryoSerializer. As a
>>> KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:
>>>
>>> env.getConfig.addDefaultKryoSerializer(classOf[DateTime],
>>> classOf[JodaDateTimeSerializer])
>>>
>>> I hope this could help.
>>>
>>> Regards,
>>> Federico
>>>
>>> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <
>>> federico.dambrosio@smartlab.ws>:
>>>
>>>> Hi Gordon,
>>>>
>>>> I'm currently using Flink 1.3.2 in local mode.
>>>>
>>>> If it's any help I realized from the log that the complete task which
>>>> is failing is:
>>>>
>>>> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskm
>>>> anager.Task                     - latest_time -> (map_active_stream,
>>>> map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched
>>>> from RUNNING to FAILED.
>>>>
>>>> val events = keyedStreamByID
>>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>>   .maxBy("time").name("latest_time").uid("latest_time")
>>>>
>>>>
>>>> val activeStream = events
>>>>   //Serialization to JsValue
>>>>   .map(event => event.toMongoActiveJsValue).na
>>>> me("map_active_stream").uid("map_active_stream")
>>>>   //Global windowing, the cause of exception should be above
>>>>   .timeWindowAll(Time.seconds(10))
>>>>   .apply(new MongoWindow(MongoWritingType.U
>>>> PDATE)).name("active_stream_window").uid("active_stream_window")
>>>>
>>>> val historyStream = airtrafficEvents
>>>>   //Serialization to JsValue
>>>>   .map(event => event.toMongoHistoryJsValue).n
>>>> ame("map_history_stream").uid("map_history_stream")
>>>>   //Global windowing, the cause of exception should be above
>>>>   .timeWindowAll(Time.seconds(10))
>>>>   .apply(new MongoWindow(MongoWritingType.U
>>>> PDATE)).name("history_stream_window").uid("history_stream_window")
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Federico
>>>>
>>>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@apache.org>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I’m looking into this. Could you let us know the Flink version in
>>>>> which the exceptions occurred?
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (
>>>>> federico.dambrosio@smartlab.ws) wrote:
>>>>>
>>>>> Hi, I'm coming across these Exceptions while running a pretty simple
flink job.
>>>>>
>>>>> First one:
>>>>> java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
>>>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>
>>>>> The second one:
>>>>> java.io.IOException: Exception while applying ReduceFunction in reducing
state
>>>>>         at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>>>>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>>>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>>>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>
>>>>>
>>>>> Since it looks like something is wrong in Watermark processing, in my
case Watermarks are generated in my KafkaSource:
>>>>>
>>>>> val stream = env.addSource(
>>>>>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(),
consumerConfig)
>>>>>     .setStartFromLatest()
>>>>>     .assignTimestampsAndWatermarks(
>>>>>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10))
{
>>>>>         def extractTimestamp(element: AirTrafficEvent): Long =
>>>>>           element.instantValues.time.getMillis
>>>>>       })
>>>>> )
>>>>>
>>>>> These exceptions aren't really that informative per se and, from what
>>>>> I see, the task triggering these exceptions is the following operator:
>>>>>
>>>>> val events = keyedStreamByID
>>>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>>>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>>>>>
>>>>> What could be the problem here in your opinion? It's not emitting
>>>>> watermarks correctly? I'm not even how I could reproduce this exceptions,
>>>>> since it looks like they happen pretty much randomly.
>>>>>
>>>>> Thank you all,
>>>>> Federico D'Ambrosio
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Federico D'Ambrosio
>>>>
>>>
>>>
>>>
>>> --
>>> Federico D'Ambrosio
>>>
>>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>
>
>
> --
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio

Mime
View raw message