flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: UnilateralSortMerger error (again)
Date Fri, 21 Apr 2017 09:53:18 GMT
In another run of the job I had another Exception. Could it be helpful?

Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception:
Serializer consumed more bytes than the record had. This indicates broken
serialization. If you are using custom serialization types (Value or
Writable), check their serialization methods. If you are using a
Kryo-serialized type, check the corresponding Kryo serializer.
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.IOException: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:123)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
at
org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36)
at
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
... 5 more

On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
stefano.bortoli@huawei.com> wrote:

> In fact the old problem was with the KryoSerializer missed initialization
> on the exception that would trigger the spilling on disk. This would lead
> to dirty serialization buffer that would eventually break the program. Till
> worked on it debugging the source code generating the error. Perhaps
> someone could try the same also this time. If Flavio can make the problem
> reproducible in a shareable program+data.
>
>
>
> Stefano
>
>
>
> *From:* Stephan Ewen [mailto:sewen@apache.org]
> *Sent:* Friday, April 21, 2017 10:04 AM
> *To:* user <user@flink.apache.org>
> *Subject:* Re: UnilateralSortMerger error (again)
>
>
>
> In the past, these errors were most often caused by bugs in the
> serializers, not in the sorter.
>
>
>
> What types are you using at that point? The Stack Trace reveals ROW and
> StringValue, any other involved types?
>
>
>
> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
> spilling to disk) and the job failed almost immediately..
>
>
>
> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
> I debugged a bit the process repeating the job on a sub-slice of the
> entire data (using the id value to filter data with parquet push down
> filters) and all slices completed successfully :(
>
> So I tried to increase the parallelism (from 1 slot per TM to 4) to see if
> this was somehow a factor of stress but it didn't cause any error.
>
> Then I almost doubled the number of rows to process and finally the error
> showed up again.
>
> It seems somehow related to spilling to disk but I can't really understand
> what's going on :(
>
> This is a summary of my debug attempts:
>
>
>
> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>
>
>
> id < 10.000.000.000  => 1.857.365 rows => OK
>
> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows =>
OK
>
> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows
=> OK
>
> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows
=> OK
>
> id >= 99.960.000.000 => 32.936.422 rows => OK
>
>
>
> 4 TM with 8 GB and 4 slot each, parallelism 16
>
>
> id >= 99.960.000.000 => 32.936.422 rows => OK
>
> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>
>
>
> Any help is appreciated..
>
> Best,
>
> Flavio
>
>
>
> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
> I could but only if there's a good probability that it fix the
> problem...how confident are you about it?
>
>
>
> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>
> Looking at git log of DataInputDeserializer.java , there has been some
> recent change.
>
>
>
> If you have time, maybe try with 1.2.1 RC and see if the error is
> reproducible ?
>
>
>
> Cheers
>
>
>
> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
> Hi to all,
>
> I think I'm again on the weird Exception with the
> SpillingAdaptiveSpanningRecordDeserializer...
>
> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
> disk but the Exception thrown is not very helpful. Any idea?
>
>
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
> at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.io.EOFException
> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(
> DataInputDeserializer.java:306)
> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:69)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:74)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:28)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
> serialize(RowSerializer.java:193)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
> serialize(RowSerializer.java:36)
> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(
> ReusingDeserializationDelegate.java:57)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:144)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.
> getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
> at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:1035)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
>
> Best,
>
> Flavio
>
>
>
>
>
>
>
>
>
>
>

Mime
View raw message