flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Re: Random Selection
Date Tue, 23 Jun 2015 09:07:59 GMT
No clue. I used the current branch aka 0.9-SNAPSHOT.
Or is this something related to Scala?

On Mon, Jun 22, 2015 at 4:45 PM, Stephan Ewen <sewen@apache.org> wrote:

> Actually, the closure cleaner is supposed to take care of the "anonymous
> inner class" situation.
>
> Did you deactivate that one, by any chance?
>
> On Mon, Jun 15, 2015 at 5:31 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Hi everyone!
>> Thanks! It seems the variable that makes the problems. Making an inner
>> class solved the issue.
>> Cheers,
>> Max
>>
>> On Mon, Jun 15, 2015 at 2:58 PM, Kruse, Sebastian <Sebastian.Kruse@hpi.de
>> > wrote:
>>
>>>  Hi everyone,
>>>
>>>
>>>
>>> I did not reenact it, but I think the problem here is rather the
>>> anonymous class. It looks like it is created within a class, not an object.
>>> Thus it is not “static” in Java terms, which means that also its
>>> surrounding class (the job class) will be serialized. And in this job
>>> class, there seems to be a DataSet field, that cannot be serialized.
>>>
>>>
>>>
>>> If that really is the problem, you should either define your anonymous
>>> class within the companion object of your job class or resort directly to a
>>> function (and make sure that you do not pass a variable from your job class
>>> into the scope of the function).
>>>
>>>
>>>
>>> Cheers,
>>>
>>> Sebastian
>>>
>>>
>>>
>>> *From:* Till Rohrmann [mailto:trohrmann@apache.org]
>>> *Sent:* Montag, 15. Juni 2015 14:16
>>> *To:* user@flink.apache.org
>>> *Subject:* Re: Random Selection
>>>
>>>
>>>
>>> Hi Max,
>>>
>>> the problem is that you’re trying to serialize the companion object of
>>> scala.util.Random. Try to create an instance of the scala.util.Random
>>> class and use this instance within your RIchFilterFunction to generate
>>> the random numbers.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber
>>> alber.maximilian@gmail.com <http://mailto:alber.maximilian@gmail.com>
>>> wrote:
>>>
>>>    Hi Flinksters,
>>>
>>> I would like to randomly choose a element of my data set. But somehow I
>>> cannot use scala.util inside my filter functions:
>>>
>>>       val sample_x = X filter(new RichFilterFunction[Vector](){
>>>         var i: Int = -1
>>>
>>>         override def open(config: Configuration) = {
>>>           i = scala.util.Random.nextInt(N)
>>>         }
>>>         def filter(a: Vector) = a.id == i
>>>       })
>>>       val sample_y = Y filter(new RichFilterFunction[Vector](){
>>>         def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
>>>       })
>>>
>>> That's the error I get:
>>>
>>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>>> An error occurred while translating the optimized plan to a nephele
>>> JobGraph: Error translating node 'Filter "Filter at
>>> Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties
>>> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
>>> grouped=null, unique=null] ]]': Could not write the user code wrapper class
>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>>> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
>>>     at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
>>>     at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
>>>     at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
>>>     at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>     at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>     at
>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>>>     at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
>>>     at
>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
>>>     at
>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
>>>     at
>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
>>>     at
>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
>>>     at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
>>>     at Test$delayedInit$body.apply(test.scala:304)
>>>     at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
>>>     at
>>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>>     at scala.App$anonfun$main$1.apply(App.scala:71)
>>>     at scala.App$anonfun$main$1.apply(App.scala:71)
>>>     at scala.collection.immutable.List.foreach(List.scala:318)
>>>     at
>>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>>>     at scala.App$class.main(App.scala:71)
>>>     at Test$.main(test.scala:45)
>>>     at Test.main(test.scala)
>>> Caused by: org.apache.flink.optimizer.CompilerException: Error
>>> translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)"
>>> : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[
>>> LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not
>>> write the user code wrapper class
>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>>> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
>>>     at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
>>>     at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
>>>     at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>>>     at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
>>>     at
>>> org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
>>>     at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
>>>     ... 21 more
>>> Caused by:
>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>>> Could not write the user code wrapper class
>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>>> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
>>>     at
>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
>>>     at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
>>>     at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
>>>     ... 26 more
>>> Caused by: java.io.NotSerializableException:
>>> org.apache.flink.api.scala.DataSet
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>     at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>     at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>     at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>     aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>     at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>     at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>     at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>     at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>     at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>     at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>     at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>     at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>     at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>     at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>     at
>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
>>>     at
>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
>>>     at
>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>>>
>>>  Did I miss something or it is simply not possible?
>>>
>>> Thanks!
>>>
>>> Cheers,
>>>
>>> Max
>>>
>>>  ​
>>>
>>
>>
>

Mime
View raw message