flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Previously working job fails on Flink 1.2.0
Date Tue, 21 Feb 2017 15:09:05 GMT
@Steffen

Yes, you can currently not use arrays as keys. There is a check missing
that gives you a proper error message for that.

The double[] is hashed on the sender side before sending it. Java's hash
over an array does not take its contents into account, but the array's
memory address, which makes it a non-deterministic hash.
When the double is re-hashed on the receiver, you get a different hash,
which is detected as violating the key groups.

In fact, your program was probably behaving wrong before, but now you get a
message for the error...



On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> if you key is a double[], even if the field is a final double[], it is
> mutable because the array entries can be mutated and maybe that is what
> happened? You can check if the following two points are in sync, hash-wise:
> KeyGroupStreamPartitioner::selectChannels and AbstractKeyedStateBackend::setCurrentKey.
> The first method basically determines to which parallel operator a tuple is
> routed in a keyed stream. The second is determining the tuple’s key group
> for the backend. Both must be in sync w.r.t. their result of the key-group
> that is determined for the tuple. And this assignment is done based on the
> hash of the key. Therefore, the hash of the tuple’s key should never change
> and must be immutable. If you can notice a change in hash code, that change
> is what breaks your code. I am pretty sure that Flink 1.1.x might just
> silently accept a mutation of the key, but actually this is arguably
> incorrect.
>
> Best,
> Stefan
>
> > Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <
> steffen@hausmann-family.de>:
> >
> > Thanks for these pointers, Stefan.
> >
> > I've started a fresh job and didn't migrate any state from previous
> execution. Moreover, all the fields of all the events I'm using are
> declared final.
> >
> > I've set a breakpoint to figure out what event is causing the problem,
> and it turns out that Flink starts processing the incoming events for some
> time and only when a certain window triggers an exception is thrown. The
> specific code that causes the exception is as follows:
> >
> >> DataStream<IdleDuration> idleDuration = cleanedTrips
> >>        .keyBy("license")
> >>        .flatMap(new DetermineIdleDuration())
> >>        .filter(duration -> duration.avg_idle_duration >= 0 &&
> duration.avg_idle_duration <= 240)
> >>        .keyBy("location")
> >>        .timeWindow(Time.minutes(10))
> >>        .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration>
> input, Collector<IdleDuration> out) -> {
> >>            double[] location = Iterables.get(input, 0).location;
> >>            double avgDuration = StreamSupport
> >>                    .stream(input.spliterator(), false)
> >>                    .mapToDouble(idle -> idle.avg_idle_duration)
> >>                    .average()
> >>                    .getAsDouble();
> >>
> >>            out.collect(new IdleDuration(location, avgDuration,
> window.maxTimestamp()));
> >>        });
> >
> > If the apply statement is removed, there is no exception during runtime.
> >
> > The location field that is referenced by the keyBy statement is actually
> a double[]. May this cause the problems I'm experiencing?
> >
> > You can find some more code for additional context in the attached
> document.
> >
> > Thanks for looking into this!
> >
> > Steffen
> >
> >
> >
> > On 20/02/2017 15:22, Stefan Richter wrote:
> >> Hi,
> >>
> >> Flink 1.2 is partitioning all keys into key-groups, the atomic units
> for rescaling. This partitioning is done by hash partitioning and is also
> in sync with the routing of tuples to operator instances (each parallel
> instance of a keyed operator is responsible for some range of key groups).
> This exception means that Flink detected a tuple in the state backend of a
> parallel operator instance that should not be there because, by its key
> hash, it belongs to a different key-group. Or phrased differently, this
> tuple belongs to a different parallel operator instance. If this is a Flink
> bug or user code bug is very hard to tell, the log also does not provide
> additional insights. I could see this happen in case that your keys are
> mutable and your code makes some changes to the object that change the hash
> code. Another question is also: did you migrate your job from Flink 1.1.3
> through an old savepoint or did you do a fresh start. Other than that, I
> can recommend to check your code for mutating of keys. If this fails
> deterministically, you could also try to set a breakpoint for the line of
> the exception and take a look if the key that is about to be inserted is
> somehow special.
> >>
> >> Best,
> >> Stefan
> >>
> >>
> >>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <
> steffen@hausmann-family.de>:
> >>>
> >>> Hi there,
> >>>
> >>> I’m having problems running a job on Flink 1.2.0 that successfully
> executes on Flink 1.1.3. The job is supposed to read events from a Kinesis
> stream and to send outputs to Elasticsearch and it actually initiates
> successfully on a Flink 1.2.0 cluster running on YARN, but as soon as I
> start to ingest events into the Kinesis stream, the job fails (see the
> attachment for more information):
> >>>
> >>> java.lang.RuntimeException: Unexpected key group index. This indicates
> a bug.
> >>>
> >>> at org.apache.flink.runtime.state.heap.StateTable.set(
> StateTable.java:57)
> >>>
> >>> at org.apache.flink.runtime.state.heap.HeapListState.add(
> HeapListState.java:98)
> >>>
> >>> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:372)
> >>>
> >>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> >>>
> >>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> >>>
> >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> >>>
> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> >>>
> >>> at java.lang.Thread.run(Thread.java:745)
> >>>
> >>> Any ideas what’s going wrong here? The job executes successfully when
> it’s compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3
> cluster. Does this indicate a bug in my code or is this rather a bug in
> Flink? How can I further debug this?
> >>>
> >>> Any guidance is highly appreciated.
> >>>
> >>> Thanks,
> >>>
> >>> Steffen
> >>>
> >>> <log>
> >>
> > <snipplet.java>
>
>

Mime
View raw message