flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Re: Forced to use Solution Set in Step Function
Date Tue, 14 Oct 2014 15:05:12 GMT
The deltaiteration calculates the cumulative sum (prefix sum) of residual_2.

I'm not sure if got you idea. I could add residual_2 to the workset. But in
that case I need to separate the "old_sum"(the sum up to now) and
residual_2 each iteration. Actually I had that before, then I tried the
Scala closure to clean up the code.

On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <fhueske@apache.org> wrote:

> Jep, I see you point.
> Conceptually, all data that changes and affects the result of an iteration
> should be part of the workset.
> Hence, the model kind of assumes that the datum "superStepNumber" should
> be part of the workset.
>
> I am not familiar with your application, but would it make sense to add
> the number as an additional attribute to the workset data set and increase
> it manually?
>
> 2014-10-14 16:45 GMT+02:00 Maximilian Alber <alber.maximilian@gmail.com>:
>
>> Ok, sounds true, but somehow I would like to execute it inside of it. So
>> I probably need to do some nonsense work to make it part of it?
>>
>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>>> Dammit you beat me to it. But yes, this is exactly what I was just
>>> writing.
>>>
>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <fhueske@apache.org>
>>> wrote:
>>> > Hi,
>>> >
>>> > I'm not super familiar with the iterations, but my guess would be that
>>> the
>>> > filter is not evaluated as part of the iteration.
>>> > Since it is not connect to the workset, the filter is not part of the
>>> loop
>>> > and evaluated once outside where no superset number is available.
>>> > I guess, moving the filter outside of the loop gives the same error.
>>> >
>>> > Cheers, Fabian
>>> >
>>> >
>>> >
>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber <
>>> alber.maximilian@gmail.com>:
>>> >>
>>> >> Hmm or maybe not. With this code I get some strange error:
>>> >>
>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>>> >> val X = env readTextFile config.xFile map
>>> >> {Vector.parseFromString(config.dimensions, _)};
>>> >> val residual = env readTextFile config.yFile map
>>> >> {Vector.parseFromString(_)};
>>> >> val randoms = env readTextFile config.randomFile map
>>> >> {Vector.parseFromString(_)}
>>> >>
>>> >> val residual_2 = residual * residual
>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>>> >>
>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>>> >> val sumVector =
>>> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>>> Array("id")) {
>>> >>     (solutionset, old_sum) =>
>>> >>     val current = residual_2 filter (new RichFilterFunction[Vector]{
>>> >>       def filter(x: Vector) = x.id ==
>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>>> >>     })
>>> >>     val sum = VectorDataSet.add(old_sum, current)
>>> >>
>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>>> >>       def map(x: Vector) = new
>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>>> >>     }),
>>> >>     sum)
>>> >> }
>>> >>
>>> >> Error:
>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to SCHEDULED
>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to DEPLOYING
>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> (/tmp/tmpBhOsLd) -
>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> switched to
>>> >> SCHEDULED
>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> (/tmp/tmpBhOsLd) -
>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> switched to
>>> >> DEPLOYING
>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to RUNNING
>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>> >> Iteration)) (1/1) switched to SCHEDULED
>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>> >> Iteration)) (1/1) switched to DEPLOYING
>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>> (/tmp/tmpBhOsLd) -
>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> switched to
>>> >> RUNNING
>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed Delta
>>> >> Iteration)) (1/1) switched to RUNNING
>>> >> 10/14/2014 15:57:35: CHAIN
>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>> Map
>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to SCHEDULED
>>> >> 10/14/2014 15:57:35: CHAIN
>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>> Map
>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to DEPLOYING
>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>> Map
>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> SCHEDULED
>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>> Map
>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> DEPLOYING
>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>> Map
>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to RUNNING
>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>> Map
>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to RUNNING
>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>> Map
>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to FAILED
>>> >> java.lang.IllegalStateException: This stub is not part of an iteration
>>> >> step function.
>>> >> at
>>> >>
>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>> >> at
>>> >>
>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> >> at
>>> >>
>>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>> >> at java.lang.Thread.run(Thread.java:745)
>>> >>
>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>> (/tmp/tmpBhOsLd) -
>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> switched to
>>> >> CANCELING
>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to CANCELING
>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>>> >> Iteration)) (1/1) switched to CANCELING
>>> >> 10/14/2014 15:57:36: Map (org.apache.flink.api.scala.DataSet$$anon$1)
>>> >> (1/1) switched to CANCELED
>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S) -
>>> UTF-8)
>>> >> (1/1) switched to CANCELED
>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta Iteration))
>>> >> (1/1) switched to CANCELED
>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>> Map
>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> CANCELING
>>> >> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>>> (1/1)
>>> >> switched to CANCELED
>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to CANCELED
>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>> (/tmp/tmpBhOsLd) -
>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
>>> switched to
>>> >> CANCELED
>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to CANCELED
>>> >> 10/14/2014 15:57:36: CHAIN
>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5) ->
>>> Map
>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched to
>>> CANCELED
>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed Delta
>>> >> Iteration)) (1/1) switched to CANCELED
>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>>> >> Error: The program execution failed: java.lang.IllegalStateException:
>>> This
>>> >> stub is not part of an iteration step function.
>>> >> at
>>> >>
>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>> >> at
>>> >>
>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> >> at
>>> >>
>>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>> >> at java.lang.Thread.run(Thread.java:745)
>>> >>
>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>>> >> <alber.maximilian@gmail.com> wrote:
>>> >>>
>>> >>> Should work now.
>>> >>> Cheers
>>> >>>
>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>> >>> <alber.maximilian@gmail.com> wrote:
>>> >>>>
>>> >>>> Ok, thanks.
>>> >>>> Please let me know when it is fixed.
>>> >>>>
>>> >>>> Cheers
>>> >>>> Max
>>> >>>>
>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen <sewen@apache.org>
>>> wrote:
>>> >>>>>
>>> >>>>> Thank you, I will look into that...
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>> >
>>>
>>
>>
>

Mime
View raw message