flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state
Date Fri, 29 Sep 2017 13:38:28 GMT
Hi,

I’m looking into this. Could you let us know the Flink version in which the exceptions occurred?

Cheers,
Gordon

On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (federico.dambrosio@smartlab.ws) wrote:

Hi, I'm coming across these Exceptions while running a pretty simple flink job.
First one:
java.lang.RuntimeException: Exception occurred while processing valve output watermark:  
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException

The second one:
java.io.IOException: Exception while applying ReduceFunction in reducing state
        at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


Since it looks like something is wrong in Watermark processing, in my case Watermarks are
generated in my KafkaSource:

val stream = env.addSource(
  new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig)
    .setStartFromLatest()
    .assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
        def extractTimestamp(element: AirTrafficEvent): Long =
          element.instantValues.time.getMillis
      })
)
These exceptions aren't really that informative per se and, from what I see, the task triggering
these exceptions is the following operator:

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("timestamp").name("latest_time").uid("latest_time")

What could be the problem here in your opinion? It's not emitting watermarks correctly? I'm
not even how I could reproduce this exceptions, since it looks like they happen pretty much
randomly.

Thank you all,
Federico D'Ambrosio
Mime
View raw message