flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Re: It is currently not supported to union between dynamic and static path in an iteration.
Date Wed, 10 Dec 2014 16:38:37 GMT
Thanks!

On Wed, Dec 10, 2014 at 4:35 PM, Stephan Ewen <sewen@apache.org> wrote:

> Hi!
>
> Yes, that is a known limitation. It is sort of straightforward to resolve,
> but will take a bit of time. I will try and get to it until the end of the
> week.
>
> Stephan
>
>
> On Tue, Dec 9, 2014 at 11:21 AM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Hi flinksters,
>>
>> I'm really close to the end (at least I hope so), but I still have some
>> issues.
>> Writing my final loop I got this error:
>>
>> org.apache.flink.compiler.CompilerException: An error occurred while
>> translating the optimized plan to a nephele JobGraph: An error occurred
>> while translating the optimized plan to a nephele JobGraph: Error: It is
>> currently not supported to union between dynamic and static path in an
>> iteration.
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567)
>> at
>> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>> at
>> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196)
>>
>> I guess that I should use just the loop step set to create the next step
>> set?
>>
>> Here is the code:
>>
>> def createPlanFullIterations(env: ExecutionEnvironment) = {
>>    val tmp = env readTextFile config.inFile map
>> {Vector.parseFromSVMLightString (config.dimensions, _)}
>>    val X = tmp map {_._1}
>>    var residual = tmp map {_._2}
>>    val randoms = env readTextFile config.randomFile map
>> {Vector.parseFromString(_)}
>>    val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile
>> != null)
>>      env readTextFile config.widthCandidatesFile map
>> {Vector.parseFromString(config.dimensions, _)}
>>    else
>>       null
>>
>>    val emptyDataSet = env.fromCollection[Vector](Seq())
>>    val model = emptyDataSet.iterate(config.iterations){
>>      stepSet =>
>>      val center = calcCenter(env, X, residual, randoms, -1)
>>
>>      val x = calcWidthHeight(env, X, residual, widthCandidates, center)
>>      val width = x._1
>>      val height = x._2
>>
>>      residual = residual - (getKernelVector(X, center, width).map(new
>> RichMapFunction[Vector, Vector]{
>>        var height: Vector = null
>>        override def open(config: Configuration) = {
>>          height =
>> getRuntimeContext.getBroadcastVariable("height").toList.head
>>        }
>>        def map(x: Vector) = {x * height}
>>        }).withBroadcastSet(height, "height"))
>>
>>      val centerOut = center map {x => new Vector(0, x.values)}
>>      val widthOut = width map {x => new Vector(1, x.values)}
>>      val heightOut = height map {x => new Vector(2, x.values)}
>>      val stepModel = centerOut union widthOut union height
>>
>>      stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
>>      def map(x: Vector) = new
>> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id,
>> x.values)
>>      }))
>>     }
>>
>>     model map { _ toString } writeAsText config.outFile
>> }
>>
>>
>> thanks!
>> Cheers,
>> Max
>>
>
>

Mime
View raw message