flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Re: No Nested Iterations??? And where is the Nested Iteration?
Date Thu, 13 Nov 2014 10:30:45 GMT
The error is gone.
Thanks!

Cheers,
Max

On Wed, Nov 12, 2014 at 11:41 PM, Stephan Ewen <sewen@apache.org> wrote:

> The current master contains a fix for the incorrectly identified nested
> iteration bug.
>
> Please let us know if it fixes your problem!
>
> Greetings,
> Stephan
>
>
> On Wed, Nov 12, 2014 at 5:19 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Talking at the meetup sounds good!
>>
>> On Wed, Nov 12, 2014 at 5:15 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Ok. With for loop style you intend a loop with a fixed range?
>>> In my case I would have a delta-iteration inside a bulk-iteration. I
>>> guess wouldn't be "roll-out-able"?
>>>
>>> Btw is there any intention to allow bulk-style iterations on several
>>> datasets "concurrently"?
>>>
>>> Maybe we could discuss my problem next week at the meetup?
>>>
>>> Thank you for the offer, but I'm in the middle of thesis, thus I don't
>>> have time for it.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Nov 12, 2014 at 4:59 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> We are not planning to add closed-loop nested iterations in the near
>>>> future. That is a bit of an effort and so far, and I think no one can pick
>>>> that up very soon.
>>>>
>>>> We will be supporting roll-out iterations (for loop style) much more
>>>> efficiently soon. There is no reason why you could not nest two for-loops.
>>>> However, those are only bulk-style, not delta-iteration style.
>>>>
>>>> If you would like to contribute iteration nesting, I could help you to
>>>> get started.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Nov 12, 2014 at 4:47 PM, Maximilian Alber <
>>>> alber.maximilian@gmail.com> wrote:
>>>>
>>>>> Oh sorry, I just read the bug title. So my questions is when you are
>>>>> planning to add nested iterations?
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Wed, Nov 12, 2014 at 4:45 PM, Maximilian Alber <
>>>>> alber.maximilian@gmail.com> wrote:
>>>>>
>>>>>> Ok, thanks.
>>>>>>
>>>>>> But the bug causes that it Flink "sees" a nested iteration where
none
>>>>>> is?
>>>>>> Or is it a bug that nested are not supported? If not when you plan
to
>>>>>> add this feature?
>>>>>> Because I need nested iterations for my algorithm, so it would be
>>>>>> nice to know when I can expect them.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On Wed, Nov 12, 2014 at 4:21 PM, Stephan Ewen <sewen@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I found the cause of the bug and have opened a JIRA to track
it.
>>>>>>>
>>>>>>> https://issues.apache.org/jira/browse/FLINK-1235
>>>>>>>
>>>>>>> You can watch that one to keep updated.
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Nov 12, 2014 at 2:48 PM, Stephan Ewen <sewen@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> I am looking into it right now...
>>>>>>>>
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Nov 11, 2014 at 2:09 PM, Maximilian Alber <
>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Stephan,
>>>>>>>>>
>>>>>>>>> you already had time to investigate this issue?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Max
>>>>>>>>>
>>>>>>>>> On Tue, Oct 21, 2014 at 2:03 PM, Stephan Ewen <sewen@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey!
>>>>>>>>>>
>>>>>>>>>> Clearly, this looks like a bug. Let me investigate
that and get
>>>>>>>>>> back at you later...
>>>>>>>>>>
>>>>>>>>>> Greetings,
>>>>>>>>>> Stephan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 21, 2014 at 1:16 PM, Maximilian Alber
<
>>>>>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Flinksters!
>>>>>>>>>>>
>>>>>>>>>>> First some good news: the cumsum code from the
last issue works
>>>>>>>>>>> now correctly and is tested.
>>>>>>>>>>>
>>>>>>>>>>> Bad news (at least for me): I just run into this
(for the error
>>>>>>>>>>> and code see below). You have a road map when
this feature will be
>>>>>>>>>>> available? Regardless of the rest, I would need
it in the near future.
>>>>>>>>>>>
>>>>>>>>>>> So far so good. But I wonder where this nested
iteration should
>>>>>>>>>>> be. At least I do not see them... I have an iteration
and inside a lot of
>>>>>>>>>>> filters/maps/etc. but not another iteration.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Max
>>>>>>>>>>>
>>>>>>>>>>> Error:
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.compiler.CompilerException:
An error occurred
>>>>>>>>>>> while translating the optimized plan to a nephele
JobGraph: An error
>>>>>>>>>>> occurred while translating the optimized plan
to a nephele JobGraph: Nested
>>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:163)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.program.Client.getJobGraph(Client.java:218)
>>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:290)
>>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException:
An error
>>>>>>>>>>> occurred while translating the optimized plan
to a nephele JobGraph: Nested
>>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:543)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:95)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:195)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:398)
>>>>>>>>>>> ... 14 more
>>>>>>>>>>> Caused by: org.apache.flink.compiler.CompilerException:
Nested
>>>>>>>>>>> Iterations are not possible at the moment!
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:395)
>>>>>>>>>>> ... 33 more
>>>>>>>>>>>
>>>>>>>>>>> Code:
>>>>>>>>>>>
>>>>>>>>>>> def createPlanFirstIteration(env: ExecutionEnvironment)
= {
>>>>>>>>>>>     val X = env readTextFile config.xFile map
>>>>>>>>>>> {Vector.parseFromString(config.dimensions, _)}
>>>>>>>>>>>     val residual = env readTextFile config.yFile
map
>>>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>>>     val randoms = env readTextFile config.randomFile
map
>>>>>>>>>>> {Vector.parseFromString(_)}
>>>>>>>>>>>     val widthCandidates = env readTextFile
>>>>>>>>>>> config.widthCandidatesFile map {Vector.parseFromString(config.dimensions,
>>>>>>>>>>> _)}
>>>>>>>>>>>
>>>>>>>>>>>     val center = calcCenter(env, X, residual,
randoms, 0)
>>>>>>>>>>>
>>>>>>>>>>>     val x = calcWidthHeight(env, X, residual,
widthCandidates,
>>>>>>>>>>> center)
>>>>>>>>>>>
>>>>>>>>>>>     x map { _ toString } writeAsText config.outFile
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> def calcCenter(env: ExecutionEnvironment, X:
DataSet[Vector],
>>>>>>>>>>> residual: DataSet[Vector], randoms: DataSet[Vector],
iteration: Int):
>>>>>>>>>>> DataSet[Vector] = {
>>>>>>>>>>>     val residual_2 = residual * residual
>>>>>>>>>>>     val ys = (residual_2 sumV) * (randoms filter
{_.id ==
>>>>>>>>>>> iteration} neutralize)
>>>>>>>>>>>
>>>>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>>>     val sumVector = env.fromCollection(Seq(Vector.zeros(1)))
>>>>>>>>>>>     val cumSum = emptyDataSet.iterateDelta(sumVector
union
>>>>>>>>>>> residual_2, config.N+1, Array("id")) {
>>>>>>>>>>>       (solutionset, workset) =>
>>>>>>>>>>>       val current = workset filter (new
>>>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>>>         def filter(x: Vector) = x.id ==
>>>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>>>        })
>>>>>>>>>>>       val old_sum = workset filter {_.id == -1}
>>>>>>>>>>>       val sum = VectorDataSet.add(old_sum, current.neutralize())
>>>>>>>>>>>
>>>>>>>>>>>       val new_workset = workset filter {_.id
!= -1} union sum
>>>>>>>>>>>        (sum map (new RichMapFunction[Vector,
Vector]{
>>>>>>>>>>>           def map(x: Vector) = new
>>>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1,
x.values)
>>>>>>>>>>>        }),
>>>>>>>>>>>       new_workset)
>>>>>>>>>>>      }
>>>>>>>>>>>    val index = cumSum.filter(new RichFilterFunction[Vector](){
>>>>>>>>>>>       var y: Vector = null
>>>>>>>>>>>      override def open(config: Configuration)
= {
>>>>>>>>>>>         y =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("ys").toList.head
>>>>>>>>>>>      }
>>>>>>>>>>>      def filter(x: Vector) = x.values(0) <
y.values(0)
>>>>>>>>>>>    }).withBroadcastSet(ys, "ys") map {x: Vector
=> Tuple1(1)}
>>>>>>>>>>> sum 0
>>>>>>>>>>>
>>>>>>>>>>>     val center = X.filter(new RichFilterFunction[Vector](){
>>>>>>>>>>>     var index: Int = -1
>>>>>>>>>>>     override def open(config: Configuration)
= {
>>>>>>>>>>>       val x: Tuple1[Int] =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("index").toList.head
>>>>>>>>>>>       index = x._1
>>>>>>>>>>>        }
>>>>>>>>>>>       def filter(x: Vector) = x.id == index
>>>>>>>>>>>     }).withBroadcastSet(index, "index")
>>>>>>>>>>>
>>>>>>>>>>>     center neutralize
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> def getKernelVector(X: DataSet[Vector], center:
DataSet[Vector],
>>>>>>>>>>> width: DataSet[Vector]): DataSet[Vector] = {
>>>>>>>>>>>     X.map(new RichMapFunction[Vector, Vector]{
>>>>>>>>>>>       var center: Vector = null
>>>>>>>>>>>       var width: Vector = null
>>>>>>>>>>>       override def open(config: Configuration)
= {
>>>>>>>>>>>        center =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("center").toList.head
>>>>>>>>>>>        width =
>>>>>>>>>>> getRuntimeContext.getBroadcastVariable("width").toList.head
>>>>>>>>>>>      }
>>>>>>>>>>>
>>>>>>>>>>>     def map(x: Vector) = new Vector(x.id,
>>>>>>>>>>> Array(Math.exp(-((((x-center)*(x-center))/width).values.sum)).toFloat))
>>>>>>>>>>>     }).withBroadcastSet(center,
>>>>>>>>>>> "center").withBroadcastSet(width, "width")
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> def calcWidthHeight(env: ExecutionEnvironment,
X:
>>>>>>>>>>> DataSet[Vector], residual: DataSet[Vector], widthCandidates:
>>>>>>>>>>> DataSet[Vector], center: DataSet[Vector]): DataSet[Vector]
= {
>>>>>>>>>>>     val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>>>>>>>>     val costs = emptyDataSet.iterateDelta(widthCandidates,
>>>>>>>>>>> config.NWidthCandidates, Array("id")) {
>>>>>>>>>>>        (solutionset, workset) =>
>>>>>>>>>>>        val currentWidth = workset filter (new
>>>>>>>>>>> RichFilterFunction[Vector]{
>>>>>>>>>>>          def filter(x: Vector) = x.id ==
>>>>>>>>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>>>>>>>>        })
>>>>>>>>>>>
>>>>>>>>>>>       val kernelVector = getKernelVector(X, center,
currentWidth)
>>>>>>>>>>>
>>>>>>>>>>>       val x1 = kernelVector dot residual map
{x => x*x}
>>>>>>>>>>>       val x2 = kernelVector dot kernelVector
>>>>>>>>>>>
>>>>>>>>>>>      val cost = (x1 / x2) neutralize
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>      (cost map (new RichMapFunction[Vector, Vector]{
>>>>>>>>>>>        def map(x: Vector) = new
>>>>>>>>>>> Vector(getIterationRuntimeContext.getSuperstepNumber-1,
x.values)
>>>>>>>>>>>      }),
>>>>>>>>>>>      workset)
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>> // todo: will not work
>>>>>>>>>>> //val width = costs max(0)
>>>>>>>>>>>
>>>>>>>>>>> //val kernelVector = getKernelVector(X, center,
width)
>>>>>>>>>>>
>>>>>>>>>>> //val x1 = kernelVector dot residual
>>>>>>>>>>> //val x2 = kernelVector dot kernelVector
>>>>>>>>>>> //val height = x1 / x2
>>>>>>>>>>>     costs
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message