flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: Bug broadcasting objects (serialization issue)
Date Wed, 02 Sep 2015 14:16:31 GMT
Ok but that would not prevent the above error, right? Serializing is
not the issue here.

Nevertheless, it would catch all errors during initial serialization.
Deserializing has its own hazards due to possible Classloader issues.

On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen <sewen@apache.org> wrote:
> Yes, even serialize in the constructor. Then the failure (if serialization
> does not work) comes immediately.
>
> On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels <mxm@apache.org> wrote:
>>
>> Nice suggestion. So you want to serialize and deserialize the InputFormats
>> on the Client to check whether they can be transferred correctly? Merely
>> serializing is not enough because the above Exception occurs during
>> deserialization.
>>
>> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>> We should try to improve the exception here. More people will run into
>>> this issue and the exception should help them understand it well.
>>>
>>> How about we do eager serialization into a set of byte arrays? Then the
>>> serializability issue comes immediately when the program is constructed,
>>> rather than later, when it is shipped.
>>>
>>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <mxm@apache.org>
>>> wrote:
>>>>
>>>> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608
>>>>
>>>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <mxm@apache.org>
>>>> wrote:
>>>>>
>>>>> Hi Andreas,
>>>>>
>>>>> Thank you for reporting the problem and including the code to reproduce
>>>>> the problem. I think there is a problem with the class serialization
or
>>>>> deserialization. Arrays.asList uses a private ArrayList class
>>>>> (java.util.Arrays$ArrayList) which is not the one you would normally
use
>>>>> (java.util.ArrayList).
>>>>>
>>>>> I'll create a JIRA issue to keep track of the problem and to
>>>>> investigate further.
>>>>>
>>>>> Best regards,
>>>>> Max
>>>>>
>>>>> Here's the stack trace:
>>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
>>>>> task 'DataSource (at main(Test.java:32)
>>>>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing
the
>>>>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block
data
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>     at
>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>>>>     at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>>>     at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>>>     at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>>>     at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>>>>     at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>>>     at
>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>>     at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: Deserializing the InputFormat
>>>>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>>>>>     at
>>>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>>>>>     ... 25 more
>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>>     at
>>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>>>>>     at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>>>>>     at
>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>>>     at
>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>>>     at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>>>     at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>>>>     at
>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>>>>>     at
>>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>>>>>     at
>>>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>>>>>     ... 26 more
>>>>>
>>>>> On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <andres@cs.aau.dk>
>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I get a bug when trying to broadcast a list of integers created with
>>>>>> the
>>>>>> primitive "Arrays.asList(...)".
>>>>>>
>>>>>> For example, if you try to run this "wordcount" example, you can
>>>>>> reproduce the bug.
>>>>>>
>>>>>>
>>>>>> public class WordCountExample {
>>>>>>     public static void main(String[] args) throws Exception {
>>>>>>         final ExecutionEnvironment env =
>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>
>>>>>>     DataSet<String> text = env.fromElements(
>>>>>>                 "Who's there?",
>>>>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>>>>
>>>>>>         List<Integer> elements = Arrays.asList(0, 0, 0);
>>>>>>
>>>>>>         DataSet<TestClass> set = env.fromElements(new
>>>>>> TestClass(elements));
>>>>>>
>>>>>>         DataSet<Tuple2<String, Integer>> wordCounts =
text
>>>>>>                 .flatMap(new LineSplitter())
>>>>>>                 .withBroadcastSet(set, "set")
>>>>>>                 .groupBy(0)
>>>>>>                 .sum(1);
>>>>>>
>>>>>>         wordCounts.print();
>>>>>>     }
>>>>>>
>>>>>>     public static class LineSplitter implements
>>>>>> FlatMapFunction<String,
>>>>>> Tuple2<String, Integer>> {
>>>>>>         @Override
>>>>>>         public void flatMap(String line, Collector<Tuple2<String,
>>>>>> Integer>> out) {
>>>>>>             for (String word : line.split(" ")) {
>>>>>>                 out.collect(new Tuple2<String, Integer>(word,
1));
>>>>>>             }
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     public static class TestClass implements Serializable {
>>>>>>         private static final long serialVersionUID =
>>>>>> -2932037991574118651L;
>>>>>>
>>>>>>         List<Integer> integerList;
>>>>>>         public TestClass(List<Integer> integerList){
>>>>>>             this.integerList=integerList;
>>>>>>         }
>>>>>>
>>>>>>
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> However, if instead of using the primitive "Arrays.asList(...)",
we
>>>>>> use
>>>>>> instead the ArrayList<> constructor, there is any problem!!!!
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Andres
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message