flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: UnilateralSortMerger error (again)
Date Fri, 21 Apr 2017 19:49:01 GMT
Hi Flavio,

these files are used for spilling data to disk. In your case sorted runs of
records.
Later all (up to a fanout threshold) these sorted runs are read and merged
to get a completely sorted record stream.

2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> The error appears as soon as some taskmanager generates some inputchannel
> file.
> What are those files used for?
>
> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <pompermaier@okkam.it
> > wrote:
>
>> In another run of the job I had another Exception. Could it be helpful?
>>
>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:355)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1094)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:99)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>> daptiveSpanningRecordDeserializer.java:123)
>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:72)
>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:42)
>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>> ReaderIterator.java:59)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>> ySegment.java:104)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
>> readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:69)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:74)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:28)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.
>> deserialize(RowSerializer.java:193)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.
>> deserialize(RowSerializer.java:36)
>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>> gate.read(ReusingDeserializationDelegate.java:57)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>> daptiveSpanningRecordDeserializer.java:109)
>> ... 5 more
>>
>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>> stefano.bortoli@huawei.com> wrote:
>>
>>> In fact the old problem was with the KryoSerializer missed
>>> initialization on the exception that would trigger the spilling on disk.
>>> This would lead to dirty serialization buffer that would eventually break
>>> the program. Till worked on it debugging the source code generating the
>>> error. Perhaps someone could try the same also this time. If Flavio can
>>> make the problem reproducible in a shareable program+data.
>>>
>>>
>>>
>>> Stefano
>>>
>>>
>>>
>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>> *To:* user <user@flink.apache.org>
>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>
>>>
>>>
>>> In the past, these errors were most often caused by bugs in the
>>> serializers, not in the sorter.
>>>
>>>
>>>
>>> What types are you using at that point? The Stack Trace reveals ROW and
>>> StringValue, any other involved types?
>>>
>>>
>>>
>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>>> spilling to disk) and the job failed almost immediately..
>>>
>>>
>>>
>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>> I debugged a bit the process repeating the job on a sub-slice of the
>>> entire data (using the id value to filter data with parquet push down
>>> filters) and all slices completed successfully :(
>>>
>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>>> if this was somehow a factor of stress but it didn't cause any error.
>>>
>>> Then I almost doubled the number of rows to process and finally the
>>> error showed up again.
>>>
>>> It seems somehow related to spilling to disk but I can't really
>>> understand what's going on :(
>>>
>>> This is a summary of my debug attempts:
>>>
>>>
>>>
>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>
>>>
>>>
>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>
>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows
=> OK
>>>
>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903
rows =>
>>> OK
>>>
>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
 rows =>
>>> OK
>>>
>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>
>>>
>>>
>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>
>>>
>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>
>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>
>>>
>>>
>>> Any help is appreciated..
>>>
>>> Best,
>>>
>>> Flavio
>>>
>>>
>>>
>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>> I could but only if there's a good probability that it fix the
>>> problem...how confident are you about it?
>>>
>>>
>>>
>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>>>
>>> Looking at git log of DataInputDeserializer.java , there has been some
>>> recent change.
>>>
>>>
>>>
>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>> reproducible ?
>>>
>>>
>>>
>>> Cheers
>>>
>>>
>>>
>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>> Hi to all,
>>>
>>> I think I'm again on the weird Exception with the
>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>
>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>> disk but the Exception thrown is not very helpful. Any idea?
>>>
>>>
>>>
>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>>> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> .getIterator(UnilateralSortMerger.java:619)
>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>> ask.java:1094)
>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>> (GroupReduceDriver.java:99)
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>> ... 3 more
>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>> Caused by: java.io.EOFException
>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>> gnedByte(DataInputDeserializer.java:306)
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:69)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:74)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:28)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:193)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:36)
>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>> gate.read(ReusingDeserializationDelegate.java:57)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>> daptiveSpanningRecordDeserializer.java:144)
>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>> Reader.next(MutableRecordReader.java:42)
>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>> ReaderIterator.java:59)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>
>>>
>>> Best,
>>>
>>> Flavio
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>

Mime
View raw message