flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Re: java.lang.IllegalStateException: This stub is not part of an iteration step function.
Date Thu, 08 Jan 2015 15:20:23 GMT
Hi Robert!

Ok, good to know. Thanks!

Cheers!
Max

On Thu, Jan 8, 2015 at 4:16 PM, Robert Metzger <rmetzger@apache.org> wrote:

> Hi Max,
>
> No. I think there is nobody in the Flink community who has plans to
> implement nested iterations in the near future.
>
> On Wed, Jan 7, 2015 at 10:58 AM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Thanks!
>>
>> I made a workaround using a pseudo join with the workset. But now I'm
>> back to the nested iteration issue. Is there any chance that this feature
>> will be available in the next time(2-3 weeks)?
>>
>> Cheers,
>> Max
>>
>> On Wed, Jan 7, 2015 at 10:11 AM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>>> Hi,
>>> the problem is that your operations do not depend on the
>>> iteration-step-dataset. Your code could be rewritten like this to make it
>>> more obvious:
>>>
>>>  val emptyDataSet = env.fromCollection[Vector](Seq())
>>>  // here we call the function
>>>      val center = calcCenter(env, X, residual, randoms, -1)
>>>
>>>      val centerX = (X subtV center) map {_ square}
>>>      val x = calcWidthHeight(env, centerX, residual, widthCandidates,
>>> center)
>>>      val width = x._1
>>>      val height = x._2
>>>
>>>      residual = residual - (getKernelVector(X, center, width) multV
>>> height)
>>>
>>>      val centerOut = center map {x => new Vector(0, x.values)}
>>>      val widthOut = width map {x => new Vector(1, x.values)}
>>>      val heightOut = height map {x => new Vector(2, x.values)}
>>>      val stepModel = centerOut union widthOut union height
>>>
>>>    // here the loop begins
>>>    val model = emptyDataSet.iterate(config.iterations){
>>>      stepSet =>
>>>
>>>      stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
>>>      def map(x: Vector) = new
>>> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id,
>>> x.values)
>>>       }))
>>>     }
>>>
>>>     model map { _ toString } writeAsText config.outFile
>>> }
>>>
>>> This means that for the system these operations are considered to be
>>> outside the loop, thus you don't have access to the IterationContext.
>>>
>>> Regards,
>>> Aljoscha
>>>
>>> On Tue, Jan 6, 2015 at 4:13 PM, Maximilian Alber <
>>> alber.maximilian@gmail.com> wrote:
>>>
>>>> Hey Flinksters!
>>>>
>>>> ran into this error
>>>>
>>>> java.lang.IllegalStateException: This stub is not part of an iteration
>>>> step function.
>>>>
>>>> below is my code, the concerning parts are marked. Is it a problem,
>>>> that the stub is in a function that is called from the iteration step
>>>> function?
>>>>
>>>>
>>>> Code:
>>>>
>>>>     ......
>>>>
>>>>    val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>    // here the loop begins
>>>>    val model = emptyDataSet.iterate(config.iterations){
>>>>      stepSet =>
>>>>      // here we call the function
>>>>      val center = calcCenter(env, X, residual, randoms, -1)
>>>>
>>>>      val centerX = (X subtV center) map {_ square}
>>>>      val x = calcWidthHeight(env, centerX, residual, widthCandidates,
>>>> center)
>>>>      val width = x._1
>>>>      val height = x._2
>>>>
>>>>      residual = residual - (getKernelVector(X, center, width) multV
>>>> height)
>>>>
>>>>      val centerOut = center map {x => new Vector(0, x.values)}
>>>>      val widthOut = width map {x => new Vector(1, x.values)}
>>>>      val heightOut = height map {x => new Vector(2, x.values)}
>>>>      val stepModel = centerOut union widthOut union height
>>>>
>>>>      stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
>>>>      def map(x: Vector) = new
>>>> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id,
>>>> x.values)
>>>>       }))
>>>>     }
>>>>
>>>>     model map { _ toString } writeAsText config.outFile
>>>> }
>>>>
>>>>
>>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
>>>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
>>>> = {
>>>>     val residual_2 = residual * residual
>>>>      val randomValue = if(iteration >= 0)
>>>>         (randoms filter {_.id == iteration})
>>>>      else
>>>>        // and this filter function causes the error
>>>>        (randoms.filter(new RichFilterFunction[Vector]{
>>>>            def filter(x: Vector) = x.id ==
>>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>>      }))
>>>>     val ys = ((residual_2 sumV() neutralize) * (randomValue neutralize))
>>>>
>>>>    .....
>>>>
>>>> The full errror:
>>>>
>>>> Error: The program execution failed: java.lang.IllegalStateException:
>>>> This stub is not part of an iteration step function.
>>>> at
>>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:119)
>>>> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:118)
>>>> at
>>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>> at
>>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>> at
>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
>>>> at
>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>> I append my program with the input files. To reproduce the error use
>>>> following command line args, please replace in_file, random_file,
>>>> width_candidates with the provided ones, and put for out_file the path you
>>>> want to:
>>>>
>>>> flink run -v bump_boost-0.1.jar -c bumpboost.Job in_file=foobar
>>>> out_file=/tmp/tmphIHeEs random_file=/tmp/tmpeKQPZk dimensions=1 N=100
>>>> width_candidates_file=/tmp/tmpTJLKMm N_width_candidates=50 iterations=30
>>>> multi_bump_boost=0 gradient_descent_iterations=30 cache=False
>>>> start_width=1.0 min_width=-4 max_width=6 min_width_update=1e-08
>>>> max_width_update=10
>>>>
>>>> Thanks!
>>>> Cheers,
>>>> Max
>>>>
>>>
>>>
>>
>

Mime
View raw message