flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Richter <s.rich...@data-artisans.com>
Subject Re: Previously working job fails on Flink 1.2.0
Date Mon, 20 Feb 2017 14:22:49 GMT

Flink 1.2 is partitioning all keys into key-groups, the atomic units for rescaling. This partitioning
is done by hash partitioning and is also in sync with the routing of tuples to operator instances
(each parallel instance of a keyed operator is responsible for some range of key groups).
This exception means that Flink detected a tuple in the state backend of a parallel operator
instance that should not be there because, by its key hash, it belongs to a different key-group.
Or phrased differently, this tuple belongs to a different parallel operator instance. If this
is a Flink bug or user code bug is very hard to tell, the log also does not provide additional
insights. I could see this happen in case that your keys are mutable and your code makes some
changes to the object that change the hash code. Another question is also: did you migrate
your job from Flink 1.1.3 through an old savepoint or did you do a fresh start. Other than
that, I can recommend to check your code for mutating of keys. If this fails deterministically,
you could also try to set a breakpoint for the line of the exception and take a look if the
key that is about to be inserted is somehow special.


> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <steffen@hausmann-family.de>:
> Hi there,
> I’m having problems running a job on Flink 1.2.0 that successfully executes on Flink
1.1.3. The job is supposed to read events from a Kinesis stream and to send outputs to Elasticsearch
and it actually initiates successfully on a Flink 1.2.0 cluster running on YARN, but as soon
as I start to ingest events into the Kinesis stream, the job fails (see the attachment for
more information):
> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
> Any ideas what’s going wrong here? The job executes successfully when it’s compiled
against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. Does this indicate a bug
in my code or is this rather a bug in Flink? How can I further debug this?
> Any guidance is highly appreciated.
> Thanks,
> Steffen
> <log>

View raw message