flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Forced to use Solution Set in Step Function
Date Tue, 07 Oct 2014 14:34:58 GMT
Hey!

Is the algorithm you are using a delta iteration in fact. If you actually
do not use the solution set, can you model it as a bulk-iteration?

If you actually need the solution set to accumulate data, we can probably
deactivate that check in the compiler. As far as I remember, there is no
requirement in the runtime to join with the solution set. The check is
meant to help programmers that forgot the join...

Greetings,
Stephan



On Tue, Oct 7, 2014 at 3:13 PM, Maximilian Alber <alber.maximilian@gmail.com
> wrote:

> Hi Flinksters!
>
> I would like to use iterateDelta function. I don't need the solution set
> inside the step function, because I generate a different values out of the
> working set. Unfortunately the compiler of the development version doesn't
> like that. Is there a workaround?
>
> The code:
>
> val residual_2a = residual_2 union
> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
> val emptyDataSet = env.fromCollection[Vector](Seq())
> val cumSum = emptyDataSet.iterateDelta(residual_2a, 1000000, Array("id")) {
>    (solutionset, workset) =>
>    val old_sum = workset filter {_.id == -1}
>    val current = workset filter (new RichFilterFunction[Vector]{
>      def filter(x: Vector) = x.id ==
> (getIterationRuntimeContext.getSuperstepNumber)
>      })
>    val residual_2 = workset filter {_.id != -1}
>    val sum = VectorDataSet.add(old_sum, current)
>
>    (sum map (new RichMapFunction[Vector, Vector]{
>      def map(x: Vector) = new
> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>     }),
>    residual_2 union sum)
> }
>
> The error:
>
> org.apache.flink.compiler.CompilerException: Error: The step function does
> not reference the solution set.
> at
> org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:868)
> at
> org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:622)
> at
> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:283)
> at
> org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:202)
> at
> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:286)
> at org.apache.flink.api.common.Plan.accept(Plan.java:281)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:517)
> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
> at org.apache.flink.client.program.Client.run(Client.java:285)
> at org.apache.flink.client.program.Client.run(Client.java:230)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>
> Thanks!
> Cheers,
> Max
>

Mime
View raw message