flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Re: No Nested Iterations??? And where is the Nested Iteration?
Date Wed, 12 Nov 2014 15:47:32 GMT
Oh sorry, I just read the bug title. So my questions is when you are
planning to add nested iterations?

Cheers,
Max

On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Ok, thanks.
>
> But the bug causes that it Flink "sees" a nested iteration where none is?
> Or is it a bug that nested are not supported? If not when you plan to add
> this feature?
> Because I need nested iterations for my algorithm, so it would be nice to
> know when I can expect them.
>
> Cheers,
> Max
>
> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> I found the cause of the bug and have opened a JIRA to track it.
>>
>> https://issues.apache.org/jira/browse/FLINK-1235
>>
>> You can watch that one to keep updated.
>>
>> Stephan
>>
>>
>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Hi!
>>>
>>> I am looking into it right now...
>>>
>>> Stephan
>>>
>>>
>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>> you already had time to investigate this issue?
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>>> Hey!
>>>>>
>>>>> Clearly, this looks like a bug. Let me investigate that and get back
>>>>> at you later...
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>> alber.maximilian@gmail.com> wrote:
>>>>>
>>>>>> Hi Flinksters!
>>>>>>
>>>>>> First some good news: the cumsum code from the last issue works now
>>>>>> correctly and is tested.
>>>>>>
>>>>>> Bad news (at least for me): I just run into this (for the error and
>>>>>> code see below). You have a road map when this feature will be available?
>>>>>> Regardless of the rest, I would need it in the near future.
>>>>>>
>>>>>> So far so good. But I wonder where this nested iteration should be.
>>>>>> At least I do not see them... I have an iteration and inside a lot
of
>>>>>> filters/maps/etc. but not another iteration.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> Error:
>>>>>>
>>>>>> org.apache.flink.compiler.CompilerException: An error occurred while
>>>>>> translating the optimized plan to a nephele JobGraph: An error occurred
>>>>>> while translating the optimized plan to a nephele JobGraph: Nested
>>>>>> Iterations are not possible at the moment!
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>> at org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>> at
>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>> at
>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An error
>>>>>> occurred while translating the optimized plan to a nephele JobGraph:
Nested
>>>>>> Iterations are not possible at the moment!
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>> at
>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>> ... 14 more
>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>> Iterations are not possible at the moment!
>>>>>> at
>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>> ... 33 more
>>>>>>
>>>>>> Code:
>>>>>>
>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) = {
>>>>>>     val X = env readTextFile config.xFile map
>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>     val residual = env readTextFile config.yFile map
>>>>>> {Vector.parseFromString(_)}
>>>>>>     val randoms = env readTextFile config.randomFile map
>>>>>> {Vector.parseFromString(_)}
>>>>>>     val widthCandidates = env readTextFile config.widthCandidatesFile
>>>>>> map {Vector.parseFromString(config.dimensions, _)}
>>>>>>
>>>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>
>>>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>>>>>
>>>>>>     x map { _ toString } writeAsText config.outFile
>>>>>> }
>>>>>>
>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration: Int):
>>>>>> DataSet[Vector] = {
>>>>>>     val residual_2 = residual * residual
>>>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id == iteration}
>>>>>> neutralize)
>>>>>>
>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>       (solutionset, workset) =>
>>>>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>>>>         def filter(x: Vector) = x.id ==
>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>        })
>>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>
>>>>>>       val new_workset = workset filter {_.id != -1} union sum
>>>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>           def map(x: Vector) = new
>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>        }),
>>>>>>       new_workset)
>>>>>>      }
>>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>       var y: Vector = null
>>>>>>      override def open(config: Configuration) = {
>>>>>>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>      }
>>>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)}
sum 0
>>>>>>
>>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>     var index: Int = -1
>>>>>>     override def open(config: Configuration) = {
>>>>>>       val x: Tuple1[Int] =
>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>       index = x._1
>>>>>>        }
>>>>>>       def filter(x: Vector) = x.id == index
>>>>>>     }).withBroadcastSet(index, "index")
>>>>>>
>>>>>>     center neutralize
>>>>>> }
>>>>>>
>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>>       var center: Vector = null
>>>>>>       var width: Vector = null
>>>>>>       override def open(config: Configuration) = {
>>>>>>        center =
>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>        width =
>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>      }
>>>>>>
>>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>> "width")
>>>>>> }
>>>>>>
>>>>>>
>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector], center:
>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>        (solutionset, workset) =>
>>>>>>        val currentWidth = workset filter (new
>>>>>> RichFilterFunction[Vector]{
>>>>>>          def filter(x: Vector) = x.id ==
>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>        })
>>>>>>
>>>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>
>>>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>>>       val x2 = kernelVector dot kernelVector
>>>>>>
>>>>>>      val cost = (x1 / x2) neutralize
>>>>>>
>>>>>>
>>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>        def map(x: Vector) = new
>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>      }),
>>>>>>      workset)
>>>>>>     }
>>>>>>
>>>>>> // todo: will not work
>>>>>> //val width = costs max(0)
>>>>>>
>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>
>>>>>> //val x1 = kernelVector dot residual
>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>> //val height = x1 / x2
>>>>>>     costs
>>>>>> }
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message