flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: UnilateralSortMerger error (again)
Date Fri, 21 Apr 2017 08:08:40 GMT
The types I read are:

[String, String, String, String, String, String, String, String, String,
Boolean, Long, Long, Long, Integer, Integer, Long, String, String, Long,
Long, String, Long, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String]

On Fri, Apr 21, 2017 at 10:03 AM, Stephan Ewen <sewen@apache.org> wrote:

> In the past, these errors were most often caused by bugs in the
> serializers, not in the sorter.
>
> What types are you using at that point? The Stack Trace reveals ROW and
> StringValue, any other involved types?
>
> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>> spilling to disk) and the job failed almost immediately..
>>
>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> I debugged a bit the process repeating the job on a sub-slice of the
>>> entire data (using the id value to filter data with parquet push down
>>> filters) and all slices completed successfully :(
>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>>> if this was somehow a factor of stress but it didn't cause any error.
>>> Then I almost doubled the number of rows to process and finally the
>>> error showed up again.
>>> It seems somehow related to spilling to disk but I can't really
>>> understand what's going on :(
>>> This is a summary of my debug attempts:
>>>
>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>
>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows
=> OK
>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903
rows =>
>>> OK
>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
 rows =>
>>> OK
>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>
>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>
>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>
>>> Any help is appreciated..
>>> Best,
>>> Flavio
>>>
>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> I could but only if there's a good probability that it fix the
>>>> problem...how confident are you about it?
>>>>
>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>>>>
>>>>> Looking at git log of DataInputDeserializer.java , there has been some
>>>>> recent change.
>>>>>
>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>> reproducible ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> Hi to all,
>>>>>> I think I'm again on the weird Exception with the
>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills
>>>>>> to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>
>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>> null
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>> ask.java:1094)
>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>> (GroupReduceDriver.java:99)
>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>> ava:460)
>>>>>> ... 3 more
>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>> terminated due to an exception: null
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>> Caused by: java.io.EOFException
>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>> va:747)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:69)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:74)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:28)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:193)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:36)
>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>> ReaderIterator.java:59)
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Mime
View raw message