flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: Forced to use Solution Set in Step Function
Date Tue, 14 Oct 2014 14:57:59 GMT
Jep, I see you point.
Conceptually, all data that changes and affects the result of an iteration
should be part of the workset.
Hence, the model kind of assumes that the datum "superStepNumber" should be
part of the workset.

I am not familiar with your application, but would it make sense to add the
number as an additional attribute to the workset data set and increase it
manually?

2014-10-14 16:45 GMT+02:00 Maximilian Alber <alber.maximilian@gmail.com>:

> Ok, sounds true, but somehow I would like to execute it inside of it. So I
> probably need to do some nonsense work to make it part of it?
>
> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Dammit you beat me to it. But yes, this is exactly what I was just
>> writing.
>>
>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <fhueske@apache.org>
>> wrote:
>> > Hi,
>> >
>> > I'm not super familiar with the iterations, but my guess would be that
>> the
>> > filter is not evaluated as part of the iteration.
>> > Since it is not connect to the workset, the filter is not part of the
>> loop
>> > and evaluated once outside where no superset number is available.
>> > I guess, moving the filter outside of the loop gives the same error.
>> >
>> > Cheers, Fabian
>> >
>> >
>> >
>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber <alber.maximilian@gmail.com
>> >:
>> >>
>> >> Hmm or maybe not. With this code I get some strange error:
>> >>
>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>> >> val X = env readTextFile config.xFile map
>> >> {Vector.parseFromString(config.dimensions, _)};
>> >> val residual = env readTextFile config.yFile map
>> >> {Vector.parseFromString(_)};
>> >> val randoms = env readTextFile config.randomFile map
>> >> {Vector.parseFromString(_)}
>> >>
>> >> val residual_2 = residual * residual
>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>> >>
>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>> >> val sumVector =
>> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>> Array("id")) {
>> >>     (solutionset, old_sum) =>
>> >>     val current = residual_2 filter (new RichFilterFunction[Vector]{
>> >>       def filter(x: Vector) = x.id ==
>> >> (getIterationRuntimeContext.getSuperstepNumber)
>> >>     })
>> >>     val sum = VectorDataSet.add(old_sum, current)
>> >>
>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>> >>       def map(x: Vector) = new
>> >> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>> >>     }),
>> >>     sum)
>> >> }
>> >>
>> >> Error:
>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to SCHEDULED
>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to DEPLOYING
>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> (/tmp/tmpBhOsLd) -
>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>> switched to
>> >> SCHEDULED
>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> (/tmp/tmpBhOsLd) -
>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>> switched to
>> >> DEPLOYING
>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to RUNNING
>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>> >> Iteration)) (1/1) switched to SCHEDULED
>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>> >> Iteration)) (1/1) switched to DEPLOYING
>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> (/tmp/tmpBhOsLd) -
>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>> switched to
>> >> RUNNING
>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>> >> Iteration)) (1/1) switched to RUNNING
>> >> 10/14/2014 15:57:35: CHAIN
>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to SCHEDULED
>> >> 10/14/2014 15:57:35: CHAIN
>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to DEPLOYING
>> >> 10/14/2014 15:57:36: CHAIN
>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> SCHEDULED
>> >> 10/14/2014 15:57:36: CHAIN
>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> DEPLOYING
>> >> 10/14/2014 15:57:36: CHAIN
>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to RUNNING
>> >> 10/14/2014 15:57:36: CHAIN
>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to RUNNING
>> >> 10/14/2014 15:57:36: CHAIN
>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to FAILED
>> >> java.lang.IllegalStateException: This stub is not part of an iteration
>> >> step function.
>> >> at
>> >>
>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >> at
>> >>
>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >> at
>> >>
>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >> at
>> >>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >> at java.lang.Thread.run(Thread.java:745)
>> >>
>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> (/tmp/tmpBhOsLd) -
>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>> switched to
>> >> CANCELING
>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to CANCELING
>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>> >> Iteration)) (1/1) switched to CANCELING
>> >> 10/14/2014 15:57:36: Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >> (1/1) switched to CANCELED
>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S) -
>> UTF-8)
>> >> (1/1) switched to CANCELED
>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta Iteration))
>> >> (1/1) switched to CANCELED
>> >> 10/14/2014 15:57:36: CHAIN
>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>> CANCELING
>> >> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost$$anonfun$8$$anon$2) (1/1)
>> >> switched to CANCELED
>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to CANCELED
>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> (/tmp/tmpBhOsLd) -
>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>> switched to
>> >> CANCELED
>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to CANCELED
>> >> 10/14/2014 15:57:36: CHAIN
>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) -> Map
>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to CANCELED
>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>> >> Iteration)) (1/1) switched to CANCELED
>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>> >> Error: The program execution failed: java.lang.IllegalStateException:
>> This
>> >> stub is not part of an iteration step function.
>> >> at
>> >>
>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >> at
>> >>
>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >> at
>> >>
>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >> at
>> >>
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >> at
>> >>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >> at java.lang.Thread.run(Thread.java:745)
>> >>
>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>> >> <alber.maximilian@gmail.com> wrote:
>> >>>
>> >>> Should work now.
>> >>> Cheers
>> >>>
>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>> >>> <alber.maximilian@gmail.com> wrote:
>> >>>>
>> >>>> Ok, thanks.
>> >>>> Please let me know when it is fixed.
>> >>>>
>> >>>> Cheers
>> >>>> Max
>> >>>>
>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen <sewen@apache.org>
>> wrote:
>> >>>>>
>> >>>>> Thank you, I will look into that...
>> >>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>
>

Mime
View raw message