flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Richter <s.rich...@data-artisans.com>
Subject Re: Previously working job fails on Flink 1.2.0
Date Tue, 21 Feb 2017 14:14:56 GMT
Hi,

if you key is a double[], even if the field is a final double[], it is mutable because the
array entries can be mutated and maybe that is what happened? You can check if the following
two points are in sync, hash-wise: KeyGroupStreamPartitioner::selectChannels and AbstractKeyedStateBackend::setCurrentKey.
The first method basically determines to which parallel operator a tuple is routed in a keyed
stream. The second is determining the tuple’s key group for the backend. Both must be in
sync w.r.t. their result of the key-group that is determined for the tuple. And this assignment
is done based on the hash of the key. Therefore, the hash of the tuple’s key should never
change and must be immutable. If you can notice a change in hash code, that change is what
breaks your code. I am pretty sure that Flink 1.1.x might just silently accept a mutation
of the key, but actually this is arguably incorrect.

Best,
Stefan

> Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <steffen@hausmann-family.de>:
> 
> Thanks for these pointers, Stefan.
> 
> I've started a fresh job and didn't migrate any state from previous execution. Moreover,
all the fields of all the events I'm using are declared final.
> 
> I've set a breakpoint to figure out what event is causing the problem, and it turns out
that Flink starts processing the incoming events for some time and only when a certain window
triggers an exception is thrown. The specific code that causes the exception is as follows:
> 
>> DataStream<IdleDuration> idleDuration = cleanedTrips
>>        .keyBy("license")
>>        .flatMap(new DetermineIdleDuration())
>>        .filter(duration -> duration.avg_idle_duration >= 0 && duration.avg_idle_duration
<= 240)
>>        .keyBy("location")
>>        .timeWindow(Time.minutes(10))
>>        .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> input,
Collector<IdleDuration> out) -> {
>>            double[] location = Iterables.get(input, 0).location;
>>            double avgDuration = StreamSupport
>>                    .stream(input.spliterator(), false)
>>                    .mapToDouble(idle -> idle.avg_idle_duration)
>>                    .average()
>>                    .getAsDouble();
>> 
>>            out.collect(new IdleDuration(location, avgDuration, window.maxTimestamp()));
>>        });
> 
> If the apply statement is removed, there is no exception during runtime.
> 
> The location field that is referenced by the keyBy statement is actually a double[].
May this cause the problems I'm experiencing?
> 
> You can find some more code for additional context in the attached document.
> 
> Thanks for looking into this!
> 
> Steffen
> 
> 
> 
> On 20/02/2017 15:22, Stefan Richter wrote:
>> Hi,
>> 
>> Flink 1.2 is partitioning all keys into key-groups, the atomic units for rescaling.
This partitioning is done by hash partitioning and is also in sync with the routing of tuples
to operator instances (each parallel instance of a keyed operator is responsible for some
range of key groups). This exception means that Flink detected a tuple in the state backend
of a parallel operator instance that should not be there because, by its key hash, it belongs
to a different key-group. Or phrased differently, this tuple belongs to a different parallel
operator instance. If this is a Flink bug or user code bug is very hard to tell, the log also
does not provide additional insights. I could see this happen in case that your keys are mutable
and your code makes some changes to the object that change the hash code. Another question
is also: did you migrate your job from Flink 1.1.3 through an old savepoint or did you do
a fresh start. Other than that, I can recommend to check your code for mutating of keys. If
this fails deterministically, you could also try to set a breakpoint for the line of the exception
and take a look if the key that is about to be inserted is somehow special.
>> 
>> Best,
>> Stefan
>> 
>> 
>>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <steffen@hausmann-family.de>:
>>> 
>>> Hi there,
>>> 
>>> I’m having problems running a job on Flink 1.2.0 that successfully executes
on Flink 1.1.3. The job is supposed to read events from a Kinesis stream and to send outputs
to Elasticsearch and it actually initiates successfully on a Flink 1.2.0 cluster running on
YARN, but as soon as I start to ingest events into the Kinesis stream, the job fails (see
the attachment for more information):
>>> 
>>> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
>>> 
>>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>>> 
>>> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>>> 
>>> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
>>> 
>>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>>> 
>>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>>> 
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>>> 
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>> 
>>> at java.lang.Thread.run(Thread.java:745)
>>> 
>>> Any ideas what’s going wrong here? The job executes successfully when it’s
compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. Does this indicate
a bug in my code or is this rather a bug in Flink? How can I further debug this?
>>> 
>>> Any guidance is highly appreciated.
>>> 
>>> Thanks,
>>> 
>>> Steffen
>>> 
>>> <log>
>> 
> <snipplet.java>


Mime
View raw message