flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: UnilateralSortMerger error (again)
Date Fri, 21 Apr 2017 19:53:22 GMT
Thanks for the explanation . Is there a way to force this behaviour in a
local environment (to try to debug the problem)?

On 21 Apr 2017 21:49, "Fabian Hueske" <fhueske@gmail.com> wrote:

> Hi Flavio,
>
> these files are used for spilling data to disk. In your case sorted runs
> of records.
> Later all (up to a fanout threshold) these sorted runs are read and merged
> to get a completely sorted record stream.
>
> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> The error appears as soon as some taskmanager generates some inputchannel
>> file.
>> What are those files used for?
>>
>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> In another run of the job I had another Exception. Could it be helpful?
>>>
>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: Serializer consumed more bytes than the
>>> record had. This indicates broken serialization. If you are using custom
>>> serialization types (Value or Writable), check their serialization methods.
>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>> serializer.
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>> k.java:355)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>>> Serializer consumed more bytes than the record had. This indicates broken
>>> serialization. If you are using custom serialization types (Value or
>>> Writable), check their serialization methods. If you are using a
>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> .getIterator(UnilateralSortMerger.java:619)
>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>> ask.java:1094)
>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>> (GroupReduceDriver.java:99)
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>> ... 3 more
>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: Serializer consumed more bytes than the
>>> record had. This indicates broken serialization. If you are using custom
>>> serialization types (Value or Writable), check their serialization methods.
>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>> serializer.
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>>> record had. This indicates broken serialization. If you are using custom
>>> serialization types (Value or Writable), check their serialization methods.
>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>> serializer.
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>> daptiveSpanningRecordDeserializer.java:123)
>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>> Reader.next(MutableRecordReader.java:42)
>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>> ReaderIterator.java:59)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>> ySegment.java:104)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:69)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:74)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:28)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:193)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:36)
>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>> gate.read(ReusingDeserializationDelegate.java:57)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>> daptiveSpanningRecordDeserializer.java:109)
>>> ... 5 more
>>>
>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>> stefano.bortoli@huawei.com> wrote:
>>>
>>>> In fact the old problem was with the KryoSerializer missed
>>>> initialization on the exception that would trigger the spilling on disk.
>>>> This would lead to dirty serialization buffer that would eventually break
>>>> the program. Till worked on it debugging the source code generating the
>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>> make the problem reproducible in a shareable program+data.
>>>>
>>>>
>>>>
>>>> Stefano
>>>>
>>>>
>>>>
>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>> *To:* user <user@flink.apache.org>
>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>
>>>>
>>>>
>>>> In the past, these errors were most often caused by bugs in the
>>>> serializers, not in the sorter.
>>>>
>>>>
>>>>
>>>> What types are you using at that point? The Stack Trace reveals ROW and
>>>> StringValue, any other involved types?
>>>>
>>>>
>>>>
>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>>>> spilling to disk) and the job failed almost immediately..
>>>>
>>>>
>>>>
>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>> I debugged a bit the process repeating the job on a sub-slice of the
>>>> entire data (using the id value to filter data with parquet push down
>>>> filters) and all slices completed successfully :(
>>>>
>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>>>> if this was somehow a factor of stress but it didn't cause any error.
>>>>
>>>> Then I almost doubled the number of rows to process and finally the
>>>> error showed up again.
>>>>
>>>> It seems somehow related to spilling to disk but I can't really
>>>> understand what's going on :(
>>>>
>>>> This is a summary of my debug attempts:
>>>>
>>>>
>>>>
>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>
>>>>
>>>>
>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>
>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714
rows => OK
>>>>
>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903
rows =>
>>>> OK
>>>>
>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
 rows
>>>> => OK
>>>>
>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>
>>>>
>>>>
>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>
>>>>
>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>
>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>
>>>>
>>>>
>>>> Any help is appreciated..
>>>>
>>>> Best,
>>>>
>>>> Flavio
>>>>
>>>>
>>>>
>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>> I could but only if there's a good probability that it fix the
>>>> problem...how confident are you about it?
>>>>
>>>>
>>>>
>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>>>>
>>>> Looking at git log of DataInputDeserializer.java , there has been some
>>>> recent change.
>>>>
>>>>
>>>>
>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>> reproducible ?
>>>>
>>>>
>>>>
>>>> Cheers
>>>>
>>>>
>>>>
>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>> Hi to all,
>>>>
>>>> I think I'm again on the weird Exception with the
>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>
>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>>> disk but the Exception thrown is not very helpful. Any idea?
>>>>
>>>>
>>>>
>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>> null
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> .getIterator(UnilateralSortMerger.java:619)
>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>> ask.java:1094)
>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>> (GroupReduceDriver.java:99)
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>>> ... 3 more
>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>> terminated due to an exception: null
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>> Caused by: java.io.EOFException
>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>> gnedByte(DataInputDeserializer.java:306)
>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:69)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:74)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:28)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:193)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:36)
>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>> daptiveSpanningRecordDeserializer.java:144)
>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>> Reader.next(MutableRecordReader.java:42)
>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>> ReaderIterator.java:59)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>>
>>>> Best,
>>>>
>>>> Flavio
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>

Mime
View raw message