flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Illegal State in Bulk Iteration
Date Mon, 01 Dec 2014 15:47:15 GMT
Hi!

I have been trying to re-implement the program you sent here (the code is
incomplete), but I cannot trigger the exception. Can you send us the
complete example?

Stephan


On Fri, Nov 28, 2014 at 3:18 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Hi Flinksters!
>
> I try to write a BulkIteration. Somehow I get a cryptic error message, at
> least I have no clue what's wrong:
>
> Code:
>
> var width = env.fromCollection[Vector](Seq(Vector.ones(config.dimensions)
> * config.startWidth)) map {x => new Vector(0, x.values)}
> var update = env.fromCollection[Vector](Seq(Vector.ones(config.dimensions)
> * 0.01F)) map {x => new Vector(1, x.values)}
> var lastGradient =
> env.fromCollection[Vector](Seq(Vector.zeros(config.dimensions))) map {x =>
> new Vector(2, x.values)}
>
> var stepSet = width union update union lastGradient
> stepSet = stepSet.iterate(config.gradientDescentIterations){
>     stepSet =>
>     var width = stepSet filter {_.id == 0}
>     var update = stepSet filter {_.id == 1}
>     var lastGradient = stepSet filter {_.id == 2}
>
>     val gradient = getGradient(X, residual, center, width)
>     val term = gradient * lastGradient
>     lastGradient = gradient
>
>     update = update.map(new RichMapFunction[Vector, Vector]{
>       var term: Vector = null
>       val minWidthUpdate = 0.00000001F
>       val maxWidthUpdate = 10.0F
>       override def open(config: Configuration) = {
>        term = getRuntimeContext.getBroadcastVariable("term").toList.head
>     }
>
>     def map(x: Vector) = {x.condMul(term.isLess(0),
> 0.5F).condMul(term.isGreater(0), 1.2F).clip(minWidthUpdate, maxWidthUpdate)}
>     }).withBroadcastSet(term, "term")
>     /*
>     width = width.map(new RichMapFunction[Vector, Vector]{
>       var update: Vector = null
>       var gradient: Vector = null
>       override def open(config: Configuration) = {
>         update =
> getRuntimeContext.getBroadcastVariable("update").toList.head
>        gradient =
> getRuntimeContext.getBroadcastVariable("gradient").toList.head
>      }
>
>     def map(x: Vector) = {(x + update * (gradient
> sign)).clip(config.minWidth, config.maxWidth)}
>     }).withBroadcastSet(update, "update")withBroadcastSet(gradient,
> "gradient")
>     */
>
>     width union update union lastGradient
> }
>
>
> Error:
>
> java.lang.IllegalStateException
> at
> org.apache.flink.compiler.dag.BulkPartialSolutionNode.setCandidateProperties(BulkPartialSolutionNode.java:50)
> at
> org.apache.flink.compiler.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:292)
> at
> org.apache.flink.compiler.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:367)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:315)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.BinaryUnionNode.getAlternativePlans(BinaryUnionNode.java:105)
> at
> org.apache.flink.compiler.dag.BinaryUnionNode.getAlternativePlans(BinaryUnionNode.java:104)
> at
> org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258)
> at
> org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:197)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:210)
> at org.apache.flink.client.program.Client.run(Client.java:288)
> at org.apache.flink.client.program.Client.run(Client.java:231)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>
> Thanks!
> Cheers,
> Max
>

Mime
View raw message