flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <stefano.bort...@huawei.com>
Subject RE: UnilateralSortMerger error (again)
Date Fri, 21 Apr 2017 09:43:23 GMT
In fact the old problem was with the KryoSerializer missed initialization on the exception
that would trigger the spilling on disk. This would lead to dirty serialization buffer that
would eventually break the program. Till worked on it debugging the source code generating
the error. Perhaps someone could try the same also this time. If Flavio can make the problem
reproducible in a shareable program+data.

Stefano

From: Stephan Ewen [mailto:sewen@apache.org]
Sent: Friday, April 21, 2017 10:04 AM
To: user <user@flink.apache.org>
Subject: Re: UnilateralSortMerger error (again)

In the past, these errors were most often caused by bugs in the serializers, not in the sorter.

What types are you using at that point? The Stack Trace reveals ROW and StringValue, any other
involved types?

On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <pompermaier@okkam.it<mailto:pompermaier@okkam.it>>
wrote:
As suggested by Fabian I set taskmanager.memory.size = 1024 (to force spilling to disk) and
the job failed almost immediately..

On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <pompermaier@okkam.it<mailto:pompermaier@okkam.it>>
wrote:
I debugged a bit the process repeating the job on a sub-slice of the entire data (using the
id value to filter data with parquet push down filters) and all slices completed successfully
:(
So I tried to increase the parallelism (from 1 slot per TM to 4) to see if this was somehow
a factor of stress but it didn't cause any error.
Then I almost doubled the number of rows to process and finally the error showed up again.
It seems somehow related to spilling to disk but I can't really understand what's going on
:(
This is a summary of my debug attempts:

4 Task managers with 6 GB  and 1 slot each, parallelism = 4

id < 10.000.000.000  => 1.857.365 rows => OK
id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows =>
OK
id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows =>
OK
id >= 99.960.000.000 => 32.936.422 rows => OK

4 TM with 8 GB and 4 slot each, parallelism 16

id >= 99.960.000.000 => 32.936.422 rows => OK
id >= 99.945.000.000  => 56.825.172 rows => ERROR

Any help is appreciated..
Best,
Flavio

On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <pompermaier@okkam.it<mailto:pompermaier@okkam.it>>
wrote:
I could but only if there's a good probability that it fix the problem...how confident are
you about it?

On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhihong@gmail.com<mailto:yuzhihong@gmail.com>>
wrote:
Looking at git log of DataInputDeserializer.java , there has been some recent change.

If you have time, maybe try with 1.2.1 RC and see if the error is reproducible ?

Cheers

On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <pompermaier@okkam.it<mailto:pompermaier@okkam.it>>
wrote:
Hi to all,
I think I'm again on the weird Exception with the SpillingAdaptiveSpanningRecordDeserializer...
I'm using Flink 1.2.0 and the error seems to rise when Flink spills to disk but the Exception
thrown is not very helpful. Any idea?

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception:
null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException
at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
at org.apache.flink.types.StringValue.readString(StringValue.java:747)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de<http://utils.runtime.RowSerializer.de>serialize(RowSerializer.java:193)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de<http://utils.runtime.RowSerializer.de>serialize(RowSerializer.java:36)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


Best,
Flavio





Mime
View raw message