flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <bort...@okkam.it>
Subject Re: kryo exception due to race condition
Date Tue, 06 Oct 2015 07:21:00 GMT
Hi guys, I could manage to complete the process crossing byte arrays I
deserialize within the group function. However, I think this workaround is
feasible just with relatively simple processes. Any idea/plan about to fix
the serialization problem?

saluti,
Stefano

Stefano Bortoli, PhD

*ENS Technical Director *_______________________________________________
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Email:* bortoli@okkam.it

*Phone nr: +39 0461 1823913 *

*Headquarters:* Trento (Italy), Via Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

2015-10-02 12:05 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:

> I don't know whether it is the same issue, but after switching from my
> POJOs to BSONObject I have got a race condition issue with kryo
> serialization.
> I could complete the process using the byte[], but at this point I
> actually need the POJO. I truly believe it is related to the reuse of the
> Kryo instance, which is not thread safe.
>
>
> ------------------------------------------------------------------------------------------------------
> 2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26
> Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched
> to FAILED
> java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
>     at java.util.ArrayList.rangeCheck(ArrayList.java:635)
>     at java.util.ArrayList.get(ArrayList.java:411)
>     at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>     at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>     at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>     at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>     at
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>     at
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>     at
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>     at
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>     at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>     at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>     at java.lang.Thread.run(Thread.java:745)
>
> 2015-10-02 9:46 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
>
>> here it is: https://issues.apache.org/jira/browse/FLINK-2800
>>
>> saluti,
>> Stefano
>>
>> 2015-10-01 18:50 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>>
>>> This looks to me like a bug where type registrations are not properly
>>> forwarded to all Serializers.
>>>
>>> Can you open a JIRA ticket for this?
>>>
>>> On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <s.bortoli@gmail.com>
>>> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> I hit a Kryo exception while running a process 'crossing' POJOs
>>>> datasets. I am using the 0.10-milestone-1.
>>>> Checking the serializer:
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>>>>
>>>> I have noticed that the Kryo instance is reused along serialization
>>>> calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I
>>>> think it may cause the problem due to possible race condition. We had these
>>>> types of issues solved with a KryoFactory implementing a pool. Perhaps it
>>>> should just a matter of calling the
>>>>
>>>> what should I do? Open a ticket?
>>>>
>>>> Thanks a lot guys for the great job!
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> -----------------------------------------
>>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>>>> ID: 114
>>>>     at
>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>>>>     at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>     at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>>>>     at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>>>>     at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>     at
>>>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>>>>     at
>>>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>>>>     at
>>>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>>>>     at
>>>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>>>>     at
>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>>>>     at
>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>
>>>
>>>
>>
>

Mime
View raw message