flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: It is currently not supported to union between dynamic and static path in an iteration.
Date Wed, 10 Dec 2014 15:35:19 GMT
Hi!

Yes, that is a known limitation. It is sort of straightforward to resolve,
but will take a bit of time. I will try and get to it until the end of the
week.

Stephan


On Tue, Dec 9, 2014 at 11:21 AM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Hi flinksters,
>
> I'm really close to the end (at least I hope so), but I still have some
> issues.
> Writing my final loop I got this error:
>
> org.apache.flink.compiler.CompilerException: An error occurred while
> translating the optimized plan to a nephele JobGraph: An error occurred
> while translating the optimized plan to a nephele JobGraph: Error: It is
> currently not supported to union between dynamic and static path in an
> iteration.
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>
> I guess that I should use just the loop step set to create the next step
> set?
>
> Here is the code:
>
> def createPlanFullIterations(env: ExecutionEnvironment) = {
>    val tmp = env readTextFile config.inFile map
> {Vector.parseFromSVMLightString (config.dimensions, _)}
>    val X = tmp map {_._1}
>    var residual = tmp map {_._2}
>    val randoms = env readTextFile config.randomFile map
> {Vector.parseFromString(_)}
>    val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile !=
> null)
>      env readTextFile config.widthCandidatesFile map
> {Vector.parseFromString(config.dimensions, _)}
>    else
>       null
>
>    val emptyDataSet = env.fromCollection[Vector](Seq())
>    val model = emptyDataSet.iterate(config.iterations){
>      stepSet =>
>      val center = calcCenter(env, X, residual, randoms, -1)
>
>      val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>      val width = x._1
>      val height = x._2
>
>      residual = residual - (getKernelVector(X, center, width).map(new
> RichMapFunction[Vector, Vector]{
>        var height: Vector = null
>        override def open(config: Configuration) = {
>          height =
> getRuntimeContext.getBroadcastVariable("height").toList.head
>        }
>        def map(x: Vector) = {x * height}
>        }).withBroadcastSet(height, "height"))
>
>      val centerOut = center map {x => new Vector(0, x.values)}
>      val widthOut = width map {x => new Vector(1, x.values)}
>      val heightOut = height map {x => new Vector(2, x.values)}
>      val stepModel = centerOut union widthOut union height
>
>      stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
>      def map(x: Vector) = new
> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
>      }))
>     }
>
>     model map { _ toString } writeAsText config.outFile
> }
>
>
> thanks!
> Cheers,
> Max
>

Mime
View raw message