flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Random Selection
Date Mon, 15 Jun 2015 12:15:54 GMT
Hi Max,

the problem is that you’re trying to serialize the companion object of
scala.util.Random. Try to create an instance of the scala.util.Random class
and use this instance within your RIchFilterFunction to generate the random
numbers.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximilian@gmail.com
<http://mailto:alber.maximilian@gmail.com> wrote:

Hi Flinksters,
>
> I would like to randomly choose a element of my data set. But somehow I
> cannot use scala.util inside my filter functions:
>
>       val sample_x = X filter(new RichFilterFunction[Vector](){
>         var i: Int = -1
>
>         override def open(config: Configuration) = {
>           i = scala.util.Random.nextInt(N)
>         }
>         def filter(a: Vector) = a.id == i
>       })
>       val sample_y = Y filter(new RichFilterFunction[Vector](){
>         def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
>       })
>
> That's the error I get:
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> An error occurred while translating the optimized plan to a nephele
> JobGraph: Error translating node 'Filter "Filter at
> Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties
> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
> grouped=null, unique=null] ]]': Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
>     at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
>     at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>     at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>     at
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
>     at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
>     at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
>     at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
>     at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
>     at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
>     at Test$delayedInit$body.apply(test.scala:304)
>     at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
>     at
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>     at scala.App$anonfun$main$1.apply(App.scala:71)
>     at scala.App$anonfun$main$1.apply(App.scala:71)
>     at scala.collection.immutable.List.foreach(List.scala:318)
>     at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>     at scala.App$class.main(App.scala:71)
>     at Test$.main(test.scala:45)
>     at Test.main(test.scala)
> Caused by: org.apache.flink.optimizer.CompilerException: Error translating
> node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP
> [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
> [ordering=null, grouped=null, unique=null] ]]': Could not write the user
> code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
>     at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>     at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
>     at
> org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
>     ... 21 more
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
>     at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
>     ... 26 more
> Caused by: java.io.NotSerializableException:
> org.apache.flink.api.scala.DataSet
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>     at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
>     at
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
>     at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>
>
> Did I miss something or it is simply not possible?
> Thanks!
> Cheers,
> Max
>
​

Mime
View raw message