flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kurt Young <ykt...@gmail.com>
Subject Re: UnilateralSortMerger error (again)
Date Thu, 27 Apr 2017 15:31:10 GMT
Hi, i have found the bug: https://issues.apache.org/jira/browse/FLINK-6398,
will open a PR soon.

Best,
Kurt

On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> Thanks a lot Kurt!
>
> On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <ykt836@gmail.com> wrote:
>
>> Thanks for the test case, i will take a look at it.
>>
>> Flavio Pompermaier <pompermaier@okkam.it>于2017年4月27日 周四03:55写道:
>>
>>> I've created a repository with a unit test to reproduce the error at
>>> https://github.com/fpompermaier/flink-batch-bug/blob/
>>> master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java (probably
>>> this error is related also to FLINK-4719).
>>>
>>> The exception is  thrown only when there are null strings and multiple
>>> slots per TM, I don't know whether UnilateralSorterMerger is involved or
>>> not (but I think so..).
>>> A quick fix for this problem would be very appreciated because it's
>>> bloking a production deployment..
>>>
>>> Thanks in advance to all,
>>> Flavio
>>>
>>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> After digging into the code and test I think that the problem is almost
>>>> certainly in the UnilateralSortMerger, there should be a missing
>>>> synchronization on some shared object somewhere...Right now I'm trying to
>>>> understand if this section of code creates some shared object (like queues)
>>>> that are accessed in a bad way when there's spilling to disk:
>>>>
>>>>                // start the thread that reads the input channels
>>>> this.readThread = getReadingThread(exceptionHandler, input,
>>>> circularQueues, largeRecordHandler,
>>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>>>>
>>>> // start the thread that sorts the buffers
>>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>>>> parentTask);
>>>>
>>>> // start the thread that handles spilling to secondary storage
>>>> this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
>>>> parentTask,
>>>> memoryManager, ioManager, serializerFactory, comparator,
>>>> this.sortReadMemory, this.writeMemory,
>>>> maxNumFileHandles);
>>>> ....
>>>> startThreads();
>>>>
>>>>
>>>> The problem is that the unit tests of GroupReduceDriver use Record and
>>>> testing Rows in not very straightforward and I'm still trying to reproduce
>>>> the problem in a local env..
>>>>
>>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Thanks for the explanation . Is there a way to force this behaviour in
>>>>> a local environment (to try to debug the problem)?
>>>>>
>>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhueske@gmail.com> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> these files are used for spilling data to disk. In your case sorted
>>>>>> runs of records.
>>>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>>>> merged to get a completely sorted record stream.
>>>>>>
>>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>
>>>>>>> The error appears as soon as some taskmanager generates some
>>>>>>> inputchannel file.
>>>>>>> What are those files used for?
>>>>>>>
>>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>>> In another run of the job I had another Exception. Could
it be
>>>>>>>> helpful?
>>>>>>>>
>>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading
>>>>>>>> Thread' terminated due to an exception: Serializer consumed
more bytes than
>>>>>>>> the record had. This indicates broken serialization. If you
are using
>>>>>>>> custom serialization types (Value or Writable), check their
serialization
>>>>>>>> methods. If you are using a Kryo-serialized type, check the
corresponding
>>>>>>>> Kryo serializer.
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.
>>>>>>>> java:465)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>>>> k.java:355)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the
sorted
>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due
to an exception:
>>>>>>>> Serializer consumed more bytes than the record had. This
indicates broken
>>>>>>>> serialization. If you are using custom serialization types
(Value or
>>>>>>>> Writable), check their serialization methods. If you are
using a
>>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>> ask.java:1094)
>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.
>>>>>>>> java:460)
>>>>>>>> ... 3 more
>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
Thread'
>>>>>>>> terminated due to an exception: Serializer consumed more
bytes than the
>>>>>>>> record had. This indicates broken serialization. If you are
using custom
>>>>>>>> serialization types (Value or Writable), check their serialization
methods.
>>>>>>>> If you are using a Kryo-serialized type, check the corresponding
Kryo
>>>>>>>> serializer.
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>> Caused by: java.io.IOException: Serializer consumed more
bytes than
>>>>>>>> the record had. This indicates broken serialization. If you
are using
>>>>>>>> custom serialization types (Value or Writable), check their
serialization
>>>>>>>> methods. If you are using a Kryo-serialized type, check the
corresponding
>>>>>>>> Kryo serializer.
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>> daptiveSpanningRecordDeserializer.java:123)
>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>> ReaderIterator.java:59)
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>>>>>> ySegment.java:104)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
>>>>>>>> readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.
>>>>>>>> java:770)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>> daptiveSpanningRecordDeserializer.java:109)
>>>>>>>> ... 5 more
>>>>>>>>
>>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>>>> stefano.bortoli@huawei.com> wrote:
>>>>>>>>
>>>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>>>> initialization on the exception that would trigger the
spilling on disk.
>>>>>>>>> This would lead to dirty serialization buffer that would
eventually break
>>>>>>>>> the program. Till worked on it debugging the source code
generating the
>>>>>>>>> error. Perhaps someone could try the same also this time.
If Flavio can
>>>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Stefano
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In the past, these errors were most often caused by bugs
in the
>>>>>>>>> serializers, not in the sorter.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What types are you using at that point? The Stack Trace
reveals
>>>>>>>>> ROW and StringValue, any other involved types?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> As suggested by Fabian I set taskmanager.memory.size
= 1024 (to
>>>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier
<
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> I debugged a bit the process repeating the job on a sub-slice
of
>>>>>>>>> the entire data (using the id value to filter data with
parquet push down
>>>>>>>>> filters) and all slices completed successfully :(
>>>>>>>>>
>>>>>>>>> So I tried to increase the parallelism (from 1 slot per
TM to 4)
>>>>>>>>> to see if this was somehow a factor of stress but it
didn't cause any error.
>>>>>>>>>
>>>>>>>>> Then I almost doubled the number of rows to process and
finally
>>>>>>>>> the error showed up again.
>>>>>>>>>
>>>>>>>>> It seems somehow related to spilling to disk but I can't
really
>>>>>>>>> understand what's going on :(
>>>>>>>>>
>>>>>>>>> This is a summary of my debug attempts:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism
= 4
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>>>
>>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000
=> 20.057.714 rows =>
>>>>>>>>> OK
>>>>>>>>>
>>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000
      => 20.926.903
>>>>>>>>> rows => OK
>>>>>>>>>
>>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000
      => 23.888.750
>>>>>>>>>  rows => OK
>>>>>>>>>
>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>
>>>>>>>>> id >= 99.945.000.000  => 56.825.172 rows =>
ERROR
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Any help is appreciated..
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> I could but only if there's a good probability that it
fix the
>>>>>>>>> problem...how confident are you about it?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhihong@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Looking at git log of DataInputDeserializer.java , there
has been
>>>>>>>>> some recent change.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if
the error is
>>>>>>>>> reproducible ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier
<
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> Hi to all,
>>>>>>>>>
>>>>>>>>> I think I'm again on the weird Exception with the
>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>>>
>>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when
Flink
>>>>>>>>> spills to disk but the Exception thrown is not very helpful.
Any idea?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining
the sorted
>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated
due to an exception:
>>>>>>>>> null
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>> ask.java:1094)
>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.
>>>>>>>>> java:460)
>>>>>>>>> ... 3 more
>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
Thread'
>>>>>>>>> terminated due to an exception: null
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>> Caused by: java.io.EOFException
>>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.
>>>>>>>>> java:747)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>> --
>> Best,
>> Kurt
>>
>
>

Mime
View raw message