flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Gelly - generics with custom vertex value
Date Wed, 10 May 2017 16:18:55 GMT
Looks like java.util.ArrayList$SubList does not work out of the box with
Kryo / Flink.

Try registering a custom serializer for it...

On Wed, May 10, 2017 at 4:16 PM, Kaepke, Marc <marc.kaepke@haw-hamburg.de>
wrote:

> Hi,
>
> a part of my bachelor thesis is an implementation of the Semi-Clustering
> algorithm [1].
> I’m using the Scatter-Gather-Iteration. Each vertex has to know its
> neighbors and the edge-value between of that. Because Gelly’s vertex
> doesn’t provide both information, I wrote an CustomVertexValue class.  An
> object contains a set of edges and a list of another custom class called
> SemiCluster, which contains a list of vertex and a double value.
> Now I’m able to create vertices like Vertex<Double, CustomVertexValue>.
>
> Unfortunately I get an exception if I run my Scatter-Gather-Iteration.
>
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException:
> Job execution failed.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$
> mcV$sp(JobManager.scala:900)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
> at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> iver.run(CoGroupWithSolutionSetSecondDriver.java:203)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:146)
> at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.lang.NullPointerException
> at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1230)
> at java.util.ArrayList$SubList.size(ArrayList.java:1040)
> at java.util.AbstractList.add(AbstractList.java:108)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:246)
> at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.deserialize(TupleSerializer.java:144)
> at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.deserialize(TupleSerializer.java:30)
> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(
> ReusingDeserializationDelegate.java:57)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:109)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.
> getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
> at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:973)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
> Process finished with exit code 1
>
> ———————————
>
> Maybe Gelly isn’t able to handle my CustomVertexValue!?
>
> The ScatterFunctions works fine.
>
>
> Thanks a lot
> Best,
> Marc
>
>
> [1] http://www.dcs.bbk.ac.uk/~dell/teaching/cc/paper/
> sigmod10/p135-malewicz.pdf chapter 5.4 (page 141)
>
>
>

Mime
View raw message