flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: UnilateralSortMerger error (again)
Date Thu, 27 Apr 2017 12:23:29 GMT
Thanks a lot Kurt!

On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <ykt836@gmail.com> wrote:

> Thanks for the test case, i will take a look at it.
>
> Flavio Pompermaier <pompermaier@okkam.it>于2017年4月27日 周四03:55写道:
>
>> I've created a repository with a unit test to reproduce the error at
>> https://github.com/fpompermaier/flink-batch-bug/
>> blob/master/src/test/java/it/okkam/flink/aci/
>> TestDataInputDeserializer.java (probably this error is related also to
>> FLINK-4719).
>>
>> The exception is  thrown only when there are null strings and multiple
>> slots per TM, I don't know whether UnilateralSorterMerger is involved or
>> not (but I think so..).
>> A quick fix for this problem would be very appreciated because it's
>> bloking a production deployment..
>>
>> Thanks in advance to all,
>> Flavio
>>
>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> After digging into the code and test I think that the problem is almost
>>> certainly in the UnilateralSortMerger, there should be a missing
>>> synchronization on some shared object somewhere...Right now I'm trying to
>>> understand if this section of code creates some shared object (like queues)
>>> that are accessed in a bad way when there's spilling to disk:
>>>
>>>                // start the thread that reads the input channels
>>> this.readThread = getReadingThread(exceptionHandler, input,
>>> circularQueues, largeRecordHandler,
>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>>>
>>> // start the thread that sorts the buffers
>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>>> parentTask);
>>>
>>> // start the thread that handles spilling to secondary storage
>>> this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
>>> parentTask,
>>> memoryManager, ioManager, serializerFactory, comparator,
>>> this.sortReadMemory, this.writeMemory,
>>> maxNumFileHandles);
>>> ....
>>> startThreads();
>>>
>>>
>>> The problem is that the unit tests of GroupReduceDriver use Record and
>>> testing Rows in not very straightforward and I'm still trying to reproduce
>>> the problem in a local env..
>>>
>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> Thanks for the explanation . Is there a way to force this behaviour in
>>>> a local environment (to try to debug the problem)?
>>>>
>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhueske@gmail.com> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> these files are used for spilling data to disk. In your case sorted
>>>>> runs of records.
>>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>>> merged to get a completely sorted record stream.
>>>>>
>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>
>>>>>> The error appears as soon as some taskmanager generates some
>>>>>> inputchannel file.
>>>>>> What are those files used for?
>>>>>>
>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>>> In another run of the job I had another Exception. Could it be
>>>>>>> helpful?
>>>>>>>
>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading
Thread'
>>>>>>> terminated due to an exception: Serializer consumed more bytes
than the
>>>>>>> record had. This indicates broken serialization. If you are using
custom
>>>>>>> serialization types (Value or Writable), check their serialization
methods.
>>>>>>> If you are using a Kryo-serialized type, check the corresponding
Kryo
>>>>>>> serializer.
>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(
>>>>>>> BatchTask.java:465)
>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(
>>>>>>> BatchTask.java:355)
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an
exception:
>>>>>>> Serializer consumed more bytes than the record had. This indicates
broken
>>>>>>> serialization. If you are using custom serialization types (Value
or
>>>>>>> Writable), check their serialization methods. If you are using
a
>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
>>>>>>> getIterator(UnilateralSortMerger.java:619)
>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(
>>>>>>> BatchTask.java:1094)
>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.
>>>>>>> prepare(GroupReduceDriver.java:99)
>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(
>>>>>>> BatchTask.java:460)
>>>>>>> ... 3 more
>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>> terminated due to an exception: Serializer consumed more bytes
than the
>>>>>>> record had. This indicates broken serialization. If you are using
custom
>>>>>>> serialization types (Value or Writable), check their serialization
methods.
>>>>>>> If you are using a Kryo-serialized type, check the corresponding
Kryo
>>>>>>> serializer.
>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>> ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes
than
>>>>>>> the record had. This indicates broken serialization. If you are
using
>>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>>> Kryo serializer.
>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:123)
>>>>>>> at org.apache.flink.runtime.io.network.api.reader.
>>>>>>> AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>> at org.apache.flink.runtime.io.network.api.reader.
>>>>>>> MutableRecordReader.next(MutableRecordReader.java:42)
>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.
>>>>>>> next(ReaderIterator.java:59)
>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>> ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>> ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(
>>>>>>> HeapMemorySegment.java:104)
>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer$
>>>>>>> NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecord
>>>>>>> Deserializer.java:226)
>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
>>>>>>> readUnsignedByte(SpillingAdaptiveSpanningRecord
>>>>>>> Deserializer.java:231)
>>>>>>> at org.apache.flink.types.StringValue.readString(
>>>>>>> StringValue.java:770)
>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>> StringSerializer.deserialize(StringSerializer.java:69)
>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>> StringSerializer.deserialize(StringSerializer.java:74)
>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>> StringSerializer.deserialize(StringSerializer.java:28)
>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>> serialize(RowSerializer.java:193)
>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>> serialize(RowSerializer.java:36)
>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate
>>>>>>> .read(ReusingDeserializationDelegate.java:57)
>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:109)
>>>>>>> ... 5 more
>>>>>>>
>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>>> stefano.bortoli@huawei.com> wrote:
>>>>>>>
>>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>>> initialization on the exception that would trigger the spilling
on disk.
>>>>>>>> This would lead to dirty serialization buffer that would
eventually break
>>>>>>>> the program. Till worked on it debugging the source code
generating the
>>>>>>>> error. Perhaps someone could try the same also this time.
If Flavio can
>>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Stefano
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> In the past, these errors were most often caused by bugs
in the
>>>>>>>> serializers, not in the sorter.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> What types are you using at that point? The Stack Trace reveals
ROW
>>>>>>>> and StringValue, any other involved types?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024
(to
>>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>> I debugged a bit the process repeating the job on a sub-slice
of
>>>>>>>> the entire data (using the id value to filter data with parquet
push down
>>>>>>>> filters) and all slices completed successfully :(
>>>>>>>>
>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM
to 4) to
>>>>>>>> see if this was somehow a factor of stress but it didn't
cause any error.
>>>>>>>>
>>>>>>>> Then I almost doubled the number of rows to process and finally
the
>>>>>>>> error showed up again.
>>>>>>>>
>>>>>>>> It seems somehow related to spilling to disk but I can't
really
>>>>>>>> understand what's going on :(
>>>>>>>>
>>>>>>>> This is a summary of my debug attempts:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism =
4
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>>
>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000
=> 20.057.714 rows =>
>>>>>>>> OK
>>>>>>>>
>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000
      => 20.926.903
>>>>>>>> rows => OK
>>>>>>>>
>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000
      => 23.888.750
>>>>>>>>  rows => OK
>>>>>>>>
>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>>
>>>>>>>>
>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>
>>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Any help is appreciated..
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>> I could but only if there's a good probability that it fix
the
>>>>>>>> problem...how confident are you about it?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhihong@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Looking at git log of DataInputDeserializer.java , there
has been
>>>>>>>> some recent change.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the
error is
>>>>>>>> reproducible ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>>
>>>>>>>> I think I'm again on the weird Exception with the
>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>>
>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink
spills
>>>>>>>> to disk but the Exception thrown is not very helpful. Any
idea?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the
sorted
>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due
to an exception:
>>>>>>>> null
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
>>>>>>>> getIterator(UnilateralSortMerger.java:619)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(
>>>>>>>> BatchTask.java:1094)
>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.
>>>>>>>> prepare(GroupReduceDriver.java:99)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(
>>>>>>>> BatchTask.java:460)
>>>>>>>> ... 3 more
>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
Thread'
>>>>>>>> terminated due to an exception: null
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>>> ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>> Caused by: java.io.EOFException
>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.
>>>>>>>> readUnsignedByte(DataInputDeserializer.java:306)
>>>>>>>> at org.apache.flink.types.StringValue.readString(
>>>>>>>> StringValue.java:747)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>>> StringSerializer.deserialize(StringSerializer.java:69)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>>> StringSerializer.deserialize(StringSerializer.java:74)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>>> StringSerializer.deserialize(StringSerializer.java:28)
>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate
>>>>>>>> .read(ReusingDeserializationDelegate.java:57)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:144)
>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.
>>>>>>>> AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.
>>>>>>>> MutableRecordReader.next(MutableRecordReader.java:42)
>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.
>>>>>>>> next(ReaderIterator.java:59)
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>>> ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>>> ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>>
>> --
> Best,
> Kurt
>

Mime
View raw message