flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Crash on DataSet.collect()
Date Mon, 04 May 2015 12:32:37 GMT
Hi Flavio!

This issue is known and has been fixed already. It occurs when you use
custom types in collect, because it uses the wrong classloader/serializer
to transfer them.

The current master should not have this issue any more.

Greetings,
Stephan


On Mon, May 4, 2015 at 2:09 PM, Flavio Baronti <f.baronti@list-group.com>
wrote:

> Hello,
>
> I'm testing the new DataSet.collect() method on version 0.9-milestone-1,
> but
> I obtain the following error on cluster execution (no problem with local
> execution), which also causes the job manager to crash:
>
> 14:05:41,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying CHAIN Cross(Cross at main(Test01.java:53)) -> Map (Map at
> main(Test01.java:54)) -> F
> latMap (FlatMap at collect(DataSet.java:413)) (1/1) (attempt #0) to india3
> 14:05:41,211 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying DataSink
> (org.apache.flink.api.java.io.DiscardingOutputFormat@4386f16) (1/1)
> (attemp
> t #0) to india3
> 14:05:41,269 INFO  org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 254ba2f06f7a9c4d454ca7288dae4092 (Flink Java Job at Mon May
> 04 14:05:39 CEST 201
> 5) changed to FINISHED .
> 14:05:41,284 ERROR akka.actor.OneForOneStrategy
> - java.io.StreamCorruptedException: invalid type code: 00
> org.apache.commons.lang3.SerializationException:
> java.io.StreamCorruptedException: invalid type code: 00
>         at
>
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
> ava:232)
>         at
>
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
> ava:268)
>         at
>
> org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
> ccumulator.java:51)
>         at
>
> org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
> ccumulator.java:35)
>         at
>
> org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAc
> cumulatorResults(AccumulatorManager.java:77)
>         at
>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessag
> es$1.applyOrElse(JobManager.scala:300)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialF
> unction.scala:33)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
> scala:33)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
> scala:25)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
> la:37)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
> la:30)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessag
> es.scala:30)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scal
> a:91)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool
> .java:1253)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
> 346)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
> :107)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
>         at
> java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at
> java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>         at
>
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
> ava:224)
>         ... 24 more
> 14:05:41,290 INFO  org.apache.flink.runtime.jobmanager.JobManager
> - Stopping JobManager akka://flink/user/jobmanager#-828467473.
> 14:05:41,297 ERROR org.apache.flink.runtime.jobmanager.JobManager
> - Actor akka://flink/user/jobmanager#-828467473 terminated, stopping
> process...
>
> Is this a known issue? Am I doing something wrong?
>
> Thanks
> Flavio
>
>
>

Mime
View raw message