flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stefano Bortoli (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2800) kryo serialization problem
Date Tue, 06 Oct 2015 07:55:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944646#comment-14944646
] 

Stefano Bortoli commented on FLINK-2800:
----------------------------------------

I had the problem using both my POJO and BSONObject in the Tuple2 crossing data read from
mongodb. However, the code is quite simple.

{code}
InputFormat<Object, BSONObject> mapreduceInputFormat = new MyMongoInputFormat<Object,
BSONObject>();
		HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
				mapreduceInputFormat, Object.class, BSONObject.class, job);

// specify connection parameters
hdIf.getConfiguration().set(MongoUtil.MONGO_INPUT_URI_PROPERTY,collectionsUri);

DataSet<Tuple2<Object, BSONObject>> input = env.createInput(hdIf);
DataSet<Tuple2<String, BSONObject>> pois = input.flatMap(	new GetEntitonForClass(CulturalSitePointOfInterest.class.getSimpleName()));
DataSet<Tuple2<String, BSONObject>> cdas = input.flatMap(GetEntitonForClass(CulturalDigitalArtefact.class.getSimpleName()));

DataSet<Tuple2<String, String>> out = pois.cross(cdas)
				.combineGroup(new POI2CDACombineGroupFunction()).distinct().setParallelism(parallelism);

DataSet<Tuple3<String, String, BSONObject>> union = out.join(pois).where(0).equalTo(0).projectFirst(0,
1).projectSecond(1);

DataSet<Tuple2<BSONWritable, MongoUpdateWritable>> writable = union.groupBy(0)
.reduceGroup(
						new AddProxy2POIReduceGroupFunction()).setParallelism(parallelism);

MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);
// emit result (this works only locally)
writable.output(new MongoHadoop2OutputFormat<BSONWritable, MongoUpdateWritable>(new
MongoOutputFormat<BSONWritable, MongoUpdateWritable>(),
				job));
// execute program
env.execute("Mongodb POI to CDA linker");
{code}

> kryo serialization problem
> --------------------------
>
>                 Key: FLINK-2800
>                 URL: https://issues.apache.org/jira/browse/FLINK-2800
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: master
>         Environment: linux ubuntu 12.04 LTS, Java 7
>            Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. The first
time I run the process, there was no problem. When I run it the second time, I have got the
exception. My guess is that it could be a race condition related to the reuse of the Kryo
serializer object. However, it could also be "a bug where type registrations are not properly
forwarded to all Serializers", as suggested by Stephan.
> ------------------------------------------------------------------------
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21	Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4)
switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 114
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> 	at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
> 	at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
> 	at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
> 	at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
> 	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
> 	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
> 	at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message