flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Forced to use Solution Set in Step Function
Date Tue, 07 Oct 2014 13:13:57 GMT
Hi Flinksters!

I would like to use iterateDelta function. I don't need the solution set
inside the step function, because I generate a different values out of the
working set. Unfortunately the compiler of the development version doesn't
like that. Is there a workaround?

The code:

val residual_2a = residual_2 union
env.fromCollection(Seq(Vector.zeros(config.dimensions)))
val emptyDataSet = env.fromCollection[Vector](Seq())
val cumSum = emptyDataSet.iterateDelta(residual_2a, 1000000, Array("id")) {
   (solutionset, workset) =>
   val old_sum = workset filter {_.id == -1}
   val current = workset filter (new RichFilterFunction[Vector]{
     def filter(x: Vector) = x.id ==
(getIterationRuntimeContext.getSuperstepNumber)
     })
   val residual_2 = workset filter {_.id != -1}
   val sum = VectorDataSet.add(old_sum, current)

   (sum map (new RichMapFunction[Vector, Vector]{
     def map(x: Vector) = new
Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
    }),
   residual_2 union sum)
}

The error:

org.apache.flink.compiler.CompilerException: Error: The step function does
not reference the solution set.
at
org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:868)
at
org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:622)
at
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:283)
at
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:202)
at
org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:286)
at org.apache.flink.api.common.Plan.accept(Plan.java:281)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:517)
at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
at org.apache.flink.client.program.Client.run(Client.java:285)
at org.apache.flink.client.program.Client.run(Client.java:230)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)

Thanks!
Cheers,
Max

Mime
View raw message