flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Unexpected out of bounds error in UnilateralSortMerger
Date Wed, 20 Jan 2016 17:05:51 GMT
You could change the version of Stephan’s branch via mvn versions:set
-DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now after
you install the Flink binaries you can reference them in your project by
setting the version of your Flink dependencies to MyCustomBuildVersion.
That way, you are sure that the right dependencies are used.

Alternatively, you could compile an example program with example input data
which can reproduce the problem. Then I could also take a look at it.

Cheers,
Till
​

On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> OK here's what I tried:
>
> * Build Flink (mvn clean install) from the branch you linked (kryo)
> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
> added local maven repo to resolvers so that it picks up the previously
> installed version (I hope)
> * Launch local cluster from newly built Flink, try to run job
>
> Still getting the same error.
>
> Is there a way to ensure that SBT is picking up the local version of Flink
> to build the uber-jar?
> Does it matter in this case, or is it enough that I'm sure the launched
> Flink instance comes from the branch you linked?
>
>
> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> The bug looks to be in the serialization via Kryo while spilling windows.
>> Note that Kryo is here used as a fallback serializer, since the
>> SparseVector is not transparent type to Flink.
>>
>> I think there are two possible reasons:
>>   1) Kryo, or our Kryo setup has an issue here
>>   2) Kryo is inconsistently configured. There are multiple Kryo instances
>> used across the serializers in the sorter. There may be a bug that they are
>> not initialized in sync.
>>
>>
>> To check this, can you build Flink with this pull request (
>> https://github.com/apache/flink/pull/1528) or from this branch (
>> https://github.com/StephanEwen/incubator-flink kryo) and see if that
>> fixes it?
>>
>>
>> Thanks,
>> Stephan
>>
>>
>>
>>
>>
>> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
>> theodoros.vasiloudis@gmail.com> wrote:
>>
>>> I haven't been able to reproduce this with other datasets. Taking a
>>> smaller sample from the large dataset I'm using (link to data
>>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
>>> causes the same problem however.
>>>
>>> I'm wondering if the implementation of readLibSVM is what's wrong here.
>>> I've tried the new version commited recently by Chiwan, but I still get the
>>> same error.
>>>
>>> I'll see if I can spot a bug in readLibSVM.
>>>
>>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
>>> theodoros.vasiloudis@gmail.com> wrote:
>>>
>>>> It's on 0.10.
>>>>
>>>> I've tried explicitly registering SparseVector (which is done anyway by
>>>> registerFlinkMLTypes
>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>>>> which is called when the SVM predict or evaluate functions are called
>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>>>> in my job but I still get the same. I will try a couple different datasets
>>>> and try to see if it's the number of features that is causing this or
>>>> something else.
>>>>
>>>> So far it works fine for a dataset with 8 features, but the large one
>>>> has 2000 and I get the above error there. I will try large datasets with
a
>>>> few features and small datasets with many features as well.
>>>>
>>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <sewen@apache.org>
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>>>>
>>>>> It is probably an incorrectly configured Kryo instance (not a problem
>>>>> of the sorter).
>>>>> What is strange is that it occurs in the "MapReferenceResolver" -
>>>>> there should be no reference resolution during serialization /
>>>>> deserialization.
>>>>>
>>>>> Can you try what happens when you explicitly register the type
>>>>> SparseVector at the ExecutionEnvironment?
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I'm trying to run a job using FlinkML and I'm confused about the
>>>>>> source of an error.
>>>>>>
>>>>>> The job reads a libSVM formatted file and trains an SVM classifier
on
>>>>>> it.
>>>>>>
>>>>>> I've tried this with small datasets and everything works out fine.
>>>>>>
>>>>>> When trying to run the same job on a large dataset (~11GB
>>>>>> uncompressed) however, I get the following error:
>>>>>>
>>>>>>
>>>>>>> java.lang.RuntimeException: Error obtaining the sorted input:
Thread
>>>>>>> 'SortMerger spilling thread' terminated due to an exception:
>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>> Serialization trace:
>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>>>>> terminated due to an exception: java.lang.IndexOutOfBoundsException:
Index:
>>>>>>> 14, Size: 2
>>>>>>> Serialization trace:
>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>> Serialization trace:
>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size:
2
>>>>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>>>>         ... 10 more
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any idea what might be causing this? I'm running the job in local
>>>>>> mode, 1 TM with 8 slots and ~32GB heap size.
>>>>>>
>>>>>> All the vectors created by the libSVM loader have the correct size.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message