flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: No Nested Iterations??? And where is the Nested Iteration?
Date Tue, 21 Oct 2014 12:03:45 GMT
Hey!

Clearly, this looks like a bug. Let me investigate that and get back at you
later...

Greetings,
Stephan


On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Hi Flinksters!
>
> First some good news: the cumsum code from the last issue works now
> correctly and is tested.
>
> Bad news (at least for me): I just run into this (for the error and code
> see below). You have a road map when this feature will be available?
> Regardless of the rest, I would need it in the near future.
>
> So far so good. But I wonder where this nested iteration should be. At
> least I do not see them... I have an iteration and inside a lot of
> filters/maps/etc. but not another iteration.
>
> Cheers,
> Max
>
> Error:
>
> org.apache.flink.compiler.CompilerException: An error occurred while
> translating the optimized plan to a nephele JobGraph: An error occurred
> while translating the optimized plan to a nephele JobGraph: Nested
> Iterations are not possible at the moment!
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
> at
> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
> at org.apache.flink.client.program.Client.run(Client.java:290)
> at org.apache.flink.client.program.Client.run(Client.java:285)
> at org.apache.flink.client.program.Client.run(Client.java:230)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> Caused by: org.apache.flink.compiler.CompilerException: An error occurred
> while translating the optimized plan to a nephele JobGraph: Nested
> Iterations are not possible at the moment!
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
> at
> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
> at
> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
> ... 14 more
> Caused by: org.apache.flink.compiler.CompilerException: Nested Iterations
> are not possible at the moment!
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
> ... 33 more
>
> Code:
>
> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>     val X = env readTextFile config.xFile map
> {Vector.parseFromString(config.dimensions, _)}
>     val residual = env readTextFile config.yFile map
> {Vector.parseFromString(_)}
>     val randoms = env readTextFile config.randomFile map
> {Vector.parseFromString(_)}
>     val widthCandidates = env readTextFile config.widthCandidatesFile map
> {Vector.parseFromString(config.dimensions, _)}
>
>     val center = calcCenter(env, X, residual, randoms, 0)
>
>     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>
>     x map { _ toString } writeAsText config.outFile
> }
>
> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
> = {
>     val residual_2 = residual * residual
>     val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
> neutralize)
>
>     val emptyDataSet = env.fromCollection[Vector](Seq())
>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>     val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2,
> config.N+1, Array("id")) {
>       (solutionset, workset) =>
>       val current = workset filter (new RichFilterFunction[Vector]{
>         def filter(x: Vector) = x.id ==
> (getIterationRuntimeContext.getSuperstepNumber-1)
>        })
>       val old_sum = workset filter {_.id == -1}
>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>
>       val new_workset = workset filter {_.id != -1} union sum
>        (sum map (new RichMapFunction[Vector, Vector]{
>           def map(x: Vector) = new
> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>        }),
>       new_workset)
>      }
>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>       var y: Vector = null
>      override def open(config: Configuration) = {
>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>      }
>      def filter(x: Vector) = x.values(0) < y.values(0)
>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>
>     val center = X.filter(new RichFilterFunction[Vector](){
>     var index: Int = -1
>     override def open(config: Configuration) = {
>       val x: Tuple1[Int] =
> getRuntimeContext.getBroadcastVariable("index").toList.head
>       index = x._1
>        }
>       def filter(x: Vector) = x.id == index
>     }).withBroadcastSet(index, "index")
>
>     center neutralize
> }
>
> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width:
> DataSet[Vector]): DataSet[Vector] = {
>     X.map(new RichMapFunction[Vector, Vector]{
>       var center: Vector = null
>       var width: Vector = null
>       override def open(config: Configuration) = {
>        center =
> getRuntimeContext.getBroadcastVariable("center").toList.head
>        width = getRuntimeContext.getBroadcastVariable("width").toList.head
>      }
>
>     def map(x: Vector) = new Vector(x.id,
> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>     }).withBroadcastSet(center, "center").withBroadcastSet(width, "width")
> }
>
>
> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
> DataSet[Vector]): DataSet[Vector] = {
>     val emptyDataSet = env.fromCollection[Vector](Seq())
>     val costs = emptyDataSet.iterateDelta(widthCandidates,
> config.NWidthCandidates, Array("id")) {
>        (solutionset, workset) =>
>        val currentWidth = workset filter (new RichFilterFunction[Vector]{
>          def filter(x: Vector) = x.id ==
> (getIterationRuntimeContext.getSuperstepNumber-1)
>        })
>
>       val kernelVector = getKernelVector(X, center, currentWidth)
>
>       val x1 = kernelVector dot residual map {x => x*x}
>       val x2 = kernelVector dot kernelVector
>
>      val cost = (x1 / x2) neutralize
>
>
>      (cost map (new RichMapFunction[Vector, Vector]{
>        def map(x: Vector) = new
> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>      }),
>      workset)
>     }
>
> // todo: will not work
> //val width = costs max(0)
>
> //val kernelVector = getKernelVector(X, center, width)
>
> //val x1 = kernelVector dot residual
> //val x2 = kernelVector dot kernelVector
> //val height = x1 / x2
>     costs
> }
>

Mime
View raw message