flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: No Nested Iterations??? And where is the Nested Iteration?
Date Wed, 12 Nov 2014 15:21:01 GMT
I found the cause of the bug and have opened a JIRA to track it.

https://issues.apache.org/jira/browse/FLINK-1235

You can watch that one to keep updated.

Stephan


On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <sewen@apache.org> wrote:

> Hi!
>
> I am looking into it right now...
>
> Stephan
>
>
> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Hi Stephan,
>>
>> you already had time to investigate this issue?
>>
>> Cheers,
>> Max
>>
>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Hey!
>>>
>>> Clearly, this looks like a bug. Let me investigate that and get back at
>>> you later...
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Hi Flinksters!
>>>>
>>>> First some good news: the cumsum code from the last issue works now
>>>> correctly and is tested.
>>>>
>>>> Bad news (at least for me): I just run into this (for the error and
>>>> code see below). You have a road map when this feature will be available?
>>>> Regardless of the rest, I would need it in the near future.
>>>>
>>>> So far so good. But I wonder where this nested iteration should be. At
>>>> least I do not see them... I have an iteration and inside a lot of
>>>> filters/maps/etc. but not another iteration.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> Error:
>>>>
>>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>>> while translating the optimized plan to a nephele JobGraph: Nested
>>>> Iterations are not possible at the moment!
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>> at
>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>> at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>> occurred while translating the optimized plan to a nephele JobGraph: Nested
>>>> Iterations are not possible at the moment!
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>> at
>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>> ... 14 more
>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>> Iterations are not possible at the moment!
>>>> at
>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>> ... 33 more
>>>>
>>>> Code:
>>>>
>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>     val X = env readTextFile config.xFile map
>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>     val residual = env readTextFile config.yFile map
>>>> {Vector.parseFromString(_)}
>>>>     val randoms = env readTextFile config.randomFile map
>>>> {Vector.parseFromString(_)}
>>>>     val widthCandidates = env readTextFile config.widthCandidatesFile
>>>> map {Vector.parseFromString(config.dimensions, _)}
>>>>
>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>
>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>>>
>>>>     x map { _ toString } writeAsText config.outFile
>>>> }
>>>>
>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
>>>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
>>>> = {
>>>>     val residual_2 = residual * residual
>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>>> neutralize)
>>>>
>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union residual_2,
>>>> config.N+1, Array("id")) {
>>>>       (solutionset, workset) =>
>>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>>         def filter(x: Vector) = x.id ==
>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>        })
>>>>       val old_sum = workset filter {_.id == -1}
>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>
>>>>       val new_workset = workset filter {_.id != -1} union sum
>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>           def map(x: Vector) = new
>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>        }),
>>>>       new_workset)
>>>>      }
>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>       var y: Vector = null
>>>>      override def open(config: Configuration) = {
>>>>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>      }
>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)} sum 0
>>>>
>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>     var index: Int = -1
>>>>     override def open(config: Configuration) = {
>>>>       val x: Tuple1[Int] =
>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>       index = x._1
>>>>        }
>>>>       def filter(x: Vector) = x.id == index
>>>>     }).withBroadcastSet(index, "index")
>>>>
>>>>     center neutralize
>>>> }
>>>>
>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector], width:
>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>       var center: Vector = null
>>>>       var width: Vector = null
>>>>       override def open(config: Configuration) = {
>>>>        center =
>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>        width =
>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>      }
>>>>
>>>>     def map(x: Vector) = new Vector(x.id,
>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>> "width")
>>>> }
>>>>
>>>>
>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>> config.NWidthCandidates, Array("id")) {
>>>>        (solutionset, workset) =>
>>>>        val currentWidth = workset filter (new
>>>> RichFilterFunction[Vector]{
>>>>          def filter(x: Vector) = x.id ==
>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>        })
>>>>
>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>
>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>       val x2 = kernelVector dot kernelVector
>>>>
>>>>      val cost = (x1 / x2) neutralize
>>>>
>>>>
>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>        def map(x: Vector) = new
>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>      }),
>>>>      workset)
>>>>     }
>>>>
>>>> // todo: will not work
>>>> //val width = costs max(0)
>>>>
>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>
>>>> //val x1 = kernelVector dot residual
>>>> //val x2 = kernelVector dot kernelVector
>>>> //val height = x1 / x2
>>>>     costs
>>>> }
>>>>
>>>
>>>
>>
>

Mime
View raw message