flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: Forced to use Solution Set in Step Function
Date Tue, 14 Oct 2014 14:35:36 GMT
Hi,

I'm not super familiar with the iterations, but my guess would be that the
filter is not evaluated as part of the iteration.
Since it is not connect to the workset, the filter is not part of the loop
and evaluated once outside where no superset number is available.
I guess, moving the filter outside of the loop gives the same error.

Cheers, Fabian



2014-10-14 16:18 GMT+02:00 Maximilian Alber <alber.maximilian@gmail.com>:

> Hmm or maybe not. With this code I get some strange error:
>
> def createPlan_find_center(env: ExecutionEnvironment) = {
> val X = env readTextFile config.xFile map
> {Vector.parseFromString(config.dimensions, _)};
> val residual = env readTextFile config.yFile map
> {Vector.parseFromString(_)};
> val randoms = env readTextFile config.randomFile map
> {Vector.parseFromString(_)}
>
> val residual_2 = residual * residual
> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>
> val emptyDataSet = env.fromCollection[Vector](Seq())
> val sumVector = env.fromCollection(Seq(Vector.zeros(config.dimensions)))
> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N, Array("id")) {
>     (solutionset, old_sum) =>
>     val current = residual_2 filter (new RichFilterFunction[Vector]{
>       def filter(x: Vector) = x.id ==
> (getIterationRuntimeContext.getSuperstepNumber)
>     })
>     val sum = VectorDataSet.add(old_sum, current)
>
>     (sum map (new RichMapFunction[Vector, Vector]{
>       def map(x: Vector) = new
> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>     }),
>     sum)
> }
>
> Error:
> 10/14/2014 15:57:35: Job execution switched to status RUNNING
> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to SCHEDULED
> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to DEPLOYING
> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
> to SCHEDULED
> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
> to DEPLOYING
> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to RUNNING
> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
> Iteration)) (1/1) switched to SCHEDULED
> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
> Iteration)) (1/1) switched to DEPLOYING
> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
> to RUNNING
> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
> Iteration)) (1/1) switched to RUNNING
> 10/14/2014 15:57:35: CHAIN
> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to SCHEDULED
> 10/14/2014 15:57:35: CHAIN
> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to DEPLOYING
> 10/14/2014 15:57:36: CHAIN
> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to SCHEDULED
> 10/14/2014 15:57:36: CHAIN
> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to DEPLOYING
> 10/14/2014 15:57:36: CHAIN
> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to RUNNING
> 10/14/2014 15:57:36: CHAIN
> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to RUNNING
> 10/14/2014 15:57:36: CHAIN
> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to FAILED
> java.lang.IllegalStateException: This stub is not part of an iteration
> step function.
> at
> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
> at
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
> at
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
> at
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
> at
> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
> at
> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
> at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
> at
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
> at java.lang.Thread.run(Thread.java:745)
>
> 10/14/2014 15:57:36: Job execution switched to status FAILING
> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
> to CANCELING
> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to CANCELING
> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
> Iteration)) (1/1) switched to CANCELING
> 10/14/2014 15:57:36: Map (org.apache.flink.api.scala.DataSet$$anon$1)
> (1/1) switched to CANCELED
> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S) - UTF-8)
> (1/1) switched to CANCELED
> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta Iteration))
> (1/1) switched to CANCELED
> 10/14/2014 15:57:36: CHAIN
> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to CANCELING
> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost$$anonfun$8$$anon$2) (1/1)
> switched to CANCELED
> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to CANCELED
> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat (/tmp/tmpBhOsLd) -
> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
> to CANCELED
> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to CANCELED
> 10/14/2014 15:57:36: CHAIN
> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to CANCELED
> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
> Iteration)) (1/1) switched to CANCELED
> 10/14/2014 15:57:36: Job execution switched to status FAILED
> Error: The program execution failed: java.lang.IllegalStateException: This
> stub is not part of an iteration step function.
> at
> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
> at
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
> at
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
> at
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
> at
> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
> at
> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
> at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
> at
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
> at java.lang.Thread.run(Thread.java:745)
>
> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Should work now.
>> Cheers
>>
>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Ok, thanks.
>>> Please let me know when it is fixed.
>>>
>>> Cheers
>>> Max
>>>
>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> Thank you, I will look into that...
>>>>
>>>
>>>
>>
>

Mime
View raw message