flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Bug broadcasting objects (serialization issue)
Date Wed, 02 Sep 2015 12:29:33 GMT
We should try to improve the exception here. More people will run into this
issue and the exception should help them understand it well.

How about we do eager serialization into a set of byte arrays? Then the
serializability issue comes immediately when the program is constructed,
rather than later, when it is shipped.

On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <mxm@apache.org> wrote:

> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608
>
> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <mxm@apache.org>
> wrote:
>
>> Hi Andreas,
>>
>> Thank you for reporting the problem and including the code to reproduce
>> the problem. I think there is a problem with the class serialization or
>> deserialization. Arrays.asList uses a private ArrayList class
>> (java.util.Arrays$ArrayList) which is not the one you would normally use
>> (java.util.ArrayList).
>>
>> I'll create a JIRA issue to keep track of the problem and to investigate
>> further.
>>
>> Best regards,
>> Max
>>
>> Here's the stack trace:
>>
>> Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
>> task 'DataSource (at main(Test.java:32)
>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>>     at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>>     at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>     at org.apache.flink.runtime.jobmanager.JobManager.org
>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>>     at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>     at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>     at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>     at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>     at
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>     at
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>     at
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>     at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>     at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.Exception: Deserializing the InputFormat
>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>>     at
>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>>     at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>>     ... 25 more
>> Caused by: java.lang.IllegalStateException: unread block data
>>     at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>>     at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>     at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>     at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>     at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>>     at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>>     at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>>     at
>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>>     ... 26 more
>>
>> On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <andres@cs.aau.dk>
>> wrote:
>>
>>> Hi,
>>>
>>> I get a bug when trying to broadcast a list of integers created with the
>>> primitive "Arrays.asList(...)".
>>>
>>> For example, if you try to run this "wordcount" example, you can
>>> reproduce the bug.
>>>
>>>
>>> public class WordCountExample {
>>>     public static void main(String[] args) throws Exception {
>>>         final ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>
>>>     DataSet<String> text = env.fromElements(
>>>                 "Who's there?",
>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>
>>>         List<Integer> elements = Arrays.asList(0, 0, 0);
>>>
>>>         DataSet<TestClass> set = env.fromElements(new
>>> TestClass(elements));
>>>
>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>                 .flatMap(new LineSplitter())
>>>                 .withBroadcastSet(set, "set")
>>>                 .groupBy(0)
>>>                 .sum(1);
>>>
>>>         wordCounts.print();
>>>     }
>>>
>>>     public static class LineSplitter implements FlatMapFunction<String,
>>> Tuple2<String, Integer>> {
>>>         @Override
>>>         public void flatMap(String line, Collector<Tuple2<String,
>>> Integer>> out) {
>>>             for (String word : line.split(" ")) {
>>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>>>             }
>>>         }
>>>     }
>>>
>>>     public static class TestClass implements Serializable {
>>>         private static final long serialVersionUID =
>>> -2932037991574118651L;
>>>
>>>         List<Integer> integerList;
>>>         public TestClass(List<Integer> integerList){
>>>             this.integerList=integerList;
>>>         }
>>>
>>>
>>>     }
>>> }
>>>
>>>
>>> However, if instead of using the primitive "Arrays.asList(...)", we use
>>> instead the ArrayList<> constructor, there is any problem!!!!
>>>
>>>
>>> Regards,
>>> Andres
>>>
>>
>>
>

Mime
View raw message