flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Re: No Nested Iterations??? And where is the Nested Iteration?
Date Wed, 12 Nov 2014 16:15:06 GMT
Ok. With for loop style you intend a loop with a fixed range?
In my case I would have a delta-iteration inside a bulk-iteration. I guess
wouldn't be "roll-out-able"?

Btw is there any intention to allow bulk-style iterations on several
datasets "concurrently"?

Maybe we could discuss my problem next week at the meetup?

Thank you for the offer, but I'm in the middle of thesis, thus I don't have
time for it.

Cheers,
Max

On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <sewen@apache.org> wrote:

> We are not planning to add closed-loop nested iterations in the near
> future. That is a bit of an effort and so far, and I think no one can pick
> that up very soon.
>
> We will be supporting roll-out iterations (for loop style) much more
> efficiently soon. There is no reason why you could not nest two for-loops.
> However, those are only bulk-style, not delta-iteration style.
>
> If you would like to contribute iteration nesting, I could help you to get
> started.
>
> Greetings,
> Stephan
>
>
> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Oh sorry, I just read the bug title. So my questions is when you are
>> planning to add nested iterations?
>>
>> Cheers,
>> Max
>>
>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Ok, thanks.
>>>
>>> But the bug causes that it Flink "sees" a nested iteration where none is?
>>> Or is it a bug that nested are not supported? If not when you plan to
>>> add this feature?
>>> Because I need nested iterations for my algorithm, so it would be nice
>>> to know when I can expect them.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> I found the cause of the bug and have opened a JIRA to track it.
>>>>
>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>
>>>> You can watch that one to keep updated.
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> I am looking into it right now...
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>> alber.maximilian@gmail.com> wrote:
>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>> you already had time to investigate this issue?
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <sewen@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey!
>>>>>>>
>>>>>>> Clearly, this looks like a bug. Let me investigate that and get
back
>>>>>>> at you later...
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Flinksters!
>>>>>>>>
>>>>>>>> First some good news: the cumsum code from the last issue
works now
>>>>>>>> correctly and is tested.
>>>>>>>>
>>>>>>>> Bad news (at least for me): I just run into this (for the
error and
>>>>>>>> code see below). You have a road map when this feature will
be available?
>>>>>>>> Regardless of the rest, I would need it in the near future.
>>>>>>>>
>>>>>>>> So far so good. But I wonder where this nested iteration
should be.
>>>>>>>> At least I do not see them... I have an iteration and inside
a lot of
>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>>
>>>>>>>> Error:
>>>>>>>>
>>>>>>>> org.apache.flink.compiler.CompilerException: An error occurred
>>>>>>>> while translating the optimized plan to a nephele JobGraph:
An error
>>>>>>>> occurred while translating the optimized plan to a nephele
JobGraph: Nested
>>>>>>>> Iterations are not possible at the moment!
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: An
error
>>>>>>>> occurred while translating the optimized plan to a nephele
JobGraph: Nested
>>>>>>>> Iterations are not possible at the moment!
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>> ... 14 more
>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException: Nested
>>>>>>>> Iterations are not possible at the moment!
>>>>>>>> at
>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>> ... 33 more
>>>>>>>>
>>>>>>>> Code:
>>>>>>>>
>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment) =
{
>>>>>>>>     val X = env readTextFile config.xFile map
>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>>     val residual = env readTextFile config.yFile map
>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>     val randoms = env readTextFile config.randomFile map
>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>     val widthCandidates = env readTextFile
>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>> _)}
>>>>>>>>
>>>>>>>>     val center = calcCenter(env, X, residual, randoms, 0)
>>>>>>>>
>>>>>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>>> center)
>>>>>>>>
>>>>>>>>     x map { _ toString } writeAsText config.outFile
>>>>>>>> }
>>>>>>>>
>>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector], iteration:
Int):
>>>>>>>> DataSet[Vector] = {
>>>>>>>>     val residual_2 = residual * residual
>>>>>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id ==
>>>>>>>> iteration} neutralize)
>>>>>>>>
>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector union
>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>>       (solutionset, workset) =>
>>>>>>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>>>>>>         def filter(x: Vector) = x.id ==
>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>        })
>>>>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>
>>>>>>>>       val new_workset = workset filter {_.id != -1} union
sum
>>>>>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>>           def map(x: Vector) = new
>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>        }),
>>>>>>>>       new_workset)
>>>>>>>>      }
>>>>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>>       var y: Vector = null
>>>>>>>>      override def open(config: Configuration) = {
>>>>>>>>         y = getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>>      }
>>>>>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector => Tuple1(1)}
sum 0
>>>>>>>>
>>>>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>>     var index: Int = -1
>>>>>>>>     override def open(config: Configuration) = {
>>>>>>>>       val x: Tuple1[Int] =
>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>>       index = x._1
>>>>>>>>        }
>>>>>>>>       def filter(x: Vector) = x.id == index
>>>>>>>>     }).withBroadcastSet(index, "index")
>>>>>>>>
>>>>>>>>     center neutralize
>>>>>>>> }
>>>>>>>>
>>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>>       var center: Vector = null
>>>>>>>>       var width: Vector = null
>>>>>>>>       override def open(config: Configuration) = {
>>>>>>>>        center =
>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>>        width =
>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>>      }
>>>>>>>>
>>>>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>>>> "width")
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector],
center:
>>>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>>        (solutionset, workset) =>
>>>>>>>>        val currentWidth = workset filter (new
>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>          def filter(x: Vector) = x.id ==
>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>        })
>>>>>>>>
>>>>>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>>
>>>>>>>>       val x1 = kernelVector dot residual map {x => x*x}
>>>>>>>>       val x2 = kernelVector dot kernelVector
>>>>>>>>
>>>>>>>>      val cost = (x1 / x2) neutralize
>>>>>>>>
>>>>>>>>
>>>>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>>        def map(x: Vector) = new
>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1, x.values)
>>>>>>>>      }),
>>>>>>>>      workset)
>>>>>>>>     }
>>>>>>>>
>>>>>>>> // todo: will not work
>>>>>>>> //val width = costs max(0)
>>>>>>>>
>>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>>
>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>> //val height = x1 / x2
>>>>>>>>     costs
>>>>>>>> }
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message