flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: No Nested Iterations??? And where is the Nested Iteration?
Date Wed, 12 Nov 2014 16:19:04 GMT
Talking at the meetup sounds good!

On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Ok. With for loop style you intend a loop with a fixed range?
> In my case I would have a delta-iteration inside a bulk-iteration. I guess
> wouldn't be "roll-out-able"?
>
> Btw is there any intention to allow bulk-style iterations on several
> datasets "concurrently"?
>
> Maybe we could discuss my problem next week at the meetup?
>
> Thank you for the offer, but I'm in the middle of thesis, thus I don't
> have time for it.
>
> Cheers,
> Max
>
> On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> We are not planning to add closed-loop nested iterations in the near
>> future. That is a bit of an effort and so far, and I think no one can pick
>> that up very soon.
>>
>> We will be supporting roll-out iterations (for loop style) much more
>> efficiently soon. There is no reason why you could not nest two for-loops.
>> However, those are only bulk-style, not delta-iteration style.
>>
>> If you would like to contribute iteration nesting, I could help you to
>> get started.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Oh sorry, I just read the bug title. So my questions is when you are
>>> planning to add nested iterations?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Ok, thanks.
>>>>
>>>> But the bug causes that it Flink "sees" a nested iteration where none
>>>> is?
>>>> Or is it a bug that nested are not supported? If not when you plan to
>>>> add this feature?
>>>> Because I need nested iterations for my algorithm, so it would be nice
>>>> to know when I can expect them.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>>> I found the cause of the bug and have opened a JIRA to track it.
>>>>>
>>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>>
>>>>> You can watch that one to keep updated.
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <sewen@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> I am looking into it right now...
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>> you already had time to investigate this issue?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <sewen@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey!
>>>>>>>>
>>>>>>>> Clearly, this looks like a bug. Let me investigate that and
get
>>>>>>>> back at you later...
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber <
>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Flinksters!
>>>>>>>>>
>>>>>>>>> First some good news: the cumsum code from the last issue
works
>>>>>>>>> now correctly and is tested.
>>>>>>>>>
>>>>>>>>> Bad news (at least for me): I just run into this (for
the error
>>>>>>>>> and code see below). You have a road map when this feature
will be
>>>>>>>>> available? Regardless of the rest, I would need it in
the near future.
>>>>>>>>>
>>>>>>>>> So far so good. But I wonder where this nested iteration
should
>>>>>>>>> be. At least I do not see them... I have an iteration
and inside a lot of
>>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Max
>>>>>>>>>
>>>>>>>>> Error:
>>>>>>>>>
>>>>>>>>> org.apache.flink.compiler.CompilerException: An error
occurred
>>>>>>>>> while translating the optimized plan to a nephele JobGraph:
An error
>>>>>>>>> occurred while translating the optimized plan to a nephele
JobGraph: Nested
>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException:
An error
>>>>>>>>> occurred while translating the optimized plan to a nephele
JobGraph: Nested
>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>>> ... 14 more
>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException:
Nested
>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>> at
>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>>> ... 33 more
>>>>>>>>>
>>>>>>>>> Code:
>>>>>>>>>
>>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment)
= {
>>>>>>>>>     val X = env readTextFile config.xFile map
>>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>>>     val residual = env readTextFile config.yFile map
>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>     val randoms = env readTextFile config.randomFile
map
>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>     val widthCandidates = env readTextFile
>>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>>> _)}
>>>>>>>>>
>>>>>>>>>     val center = calcCenter(env, X, residual, randoms,
0)
>>>>>>>>>
>>>>>>>>>     val x = calcWidthHeight(env, X, residual, widthCandidates,
>>>>>>>>> center)
>>>>>>>>>
>>>>>>>>>     x map { _ toString } writeAsText config.outFile
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector],
iteration: Int):
>>>>>>>>> DataSet[Vector] = {
>>>>>>>>>     val residual_2 = residual * residual
>>>>>>>>>     val ys = (residual_2 sumV) * (randoms filter {_.id
==
>>>>>>>>> iteration} neutralize)
>>>>>>>>>
>>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector
union
>>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>>>       (solutionset, workset) =>
>>>>>>>>>       val current = workset filter (new RichFilterFunction[Vector]{
>>>>>>>>>         def filter(x: Vector) = x.id ==
>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>        })
>>>>>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>>
>>>>>>>>>       val new_workset = workset filter {_.id != -1} union
sum
>>>>>>>>>        (sum map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>           def map(x: Vector) = new
>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1,
x.values)
>>>>>>>>>        }),
>>>>>>>>>       new_workset)
>>>>>>>>>      }
>>>>>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>>>       var y: Vector = null
>>>>>>>>>      override def open(config: Configuration) = {
>>>>>>>>>         y =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>>>      }
>>>>>>>>>      def filter(x: Vector) = x.values(0) < y.values(0)
>>>>>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector =>
Tuple1(1)} sum 0
>>>>>>>>>
>>>>>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>>>     var index: Int = -1
>>>>>>>>>     override def open(config: Configuration) = {
>>>>>>>>>       val x: Tuple1[Int] =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>>>       index = x._1
>>>>>>>>>        }
>>>>>>>>>       def filter(x: Vector) = x.id == index
>>>>>>>>>     }).withBroadcastSet(index, "index")
>>>>>>>>>
>>>>>>>>>     center neutralize
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> def getKernelVector(X: DataSet[Vector], center: DataSet[Vector],
>>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>>>       var center: Vector = null
>>>>>>>>>       var width: Vector = null
>>>>>>>>>       override def open(config: Configuration) = {
>>>>>>>>>        center =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>>>        width =
>>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>>>      }
>>>>>>>>>
>>>>>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>>>     }).withBroadcastSet(center, "center").withBroadcastSet(width,
>>>>>>>>> "width")
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment, X: DataSet[Vector],
>>>>>>>>> residual: DataSet[Vector], widthCandidates: DataSet[Vector],
center:
>>>>>>>>> DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>>>        (solutionset, workset) =>
>>>>>>>>>        val currentWidth = workset filter (new
>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>          def filter(x: Vector) = x.id ==
>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>        })
>>>>>>>>>
>>>>>>>>>       val kernelVector = getKernelVector(X, center, currentWidth)
>>>>>>>>>
>>>>>>>>>       val x1 = kernelVector dot residual map {x =>
x*x}
>>>>>>>>>       val x2 = kernelVector dot kernelVector
>>>>>>>>>
>>>>>>>>>      val cost = (x1 / x2) neutralize
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>        def map(x: Vector) = new
>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1,
x.values)
>>>>>>>>>      }),
>>>>>>>>>      workset)
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> // todo: will not work
>>>>>>>>> //val width = costs max(0)
>>>>>>>>>
>>>>>>>>> //val kernelVector = getKernelVector(X, center, width)
>>>>>>>>>
>>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>>> //val height = x1 / x2
>>>>>>>>>     costs
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message