flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Forced to use Solution Set in Step Function
Date Tue, 14 Oct 2014 15:21:36 GMT
Maybe you could use the residual_2 data set as a broadcast dataset.
i.e. make in available in the operation that adds the residual for the
current iteration number to the old_sum. (I'm not sure what the
VectorDataSet.add() method does here). If you gave me the complete
code I could try finding an elegant solution to that problem.

On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <sewen@apache.org> wrote:
> That is an interesting case. Everything that is loop invariant is computed
> once outside the loop. You are looking for a way to make this part of the
> loop.
>
> Can you try making the filter part of the "VectorDataSet.add(old_sum,
> current)" operation?
>
> On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
> <alber.maximilian@gmail.com> wrote:
>>
>> The deltaiteration calculates the cumulative sum (prefix sum) of
>> residual_2.
>>
>> I'm not sure if got you idea. I could add residual_2 to the workset. But
>> in that case I need to separate the "old_sum"(the sum up to now) and
>> residual_2 each iteration. Actually I had that before, then I tried the
>> Scala closure to clean up the code.
>>
>> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <fhueske@apache.org> wrote:
>>>
>>> Jep, I see you point.
>>> Conceptually, all data that changes and affects the result of an
>>> iteration should be part of the workset.
>>> Hence, the model kind of assumes that the datum "superStepNumber" should
>>> be part of the workset.
>>>
>>> I am not familiar with your application, but would it make sense to add
>>> the number as an additional attribute to the workset data set and increase
>>> it manually?
>>>
>>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber <alber.maximilian@gmail.com>:
>>>>
>>>> Ok, sounds true, but somehow I would like to execute it inside of it. So
>>>> I probably need to do some nonsense work to make it part of it?
>>>>
>>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek <aljoscha@apache.org>
>>>> wrote:
>>>>>
>>>>> Dammit you beat me to it. But yes, this is exactly what I was just
>>>>> writing.
>>>>>
>>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <fhueske@apache.org>
>>>>> wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I'm not super familiar with the iterations, but my guess would be
>>>>> > that the
>>>>> > filter is not evaluated as part of the iteration.
>>>>> > Since it is not connect to the workset, the filter is not part of
the
>>>>> > loop
>>>>> > and evaluated once outside where no superset number is available.
>>>>> > I guess, moving the filter outside of the loop gives the same error.
>>>>> >
>>>>> > Cheers, Fabian
>>>>> >
>>>>> >
>>>>> >
>>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>>>>> > <alber.maximilian@gmail.com>:
>>>>> >>
>>>>> >> Hmm or maybe not. With this code I get some strange error:
>>>>> >>
>>>>> >> def createPlan_find_center(env: ExecutionEnvironment) = {
>>>>> >> val X = env readTextFile config.xFile map
>>>>> >> {Vector.parseFromString(config.dimensions, _)};
>>>>> >> val residual = env readTextFile config.yFile map
>>>>> >> {Vector.parseFromString(_)};
>>>>> >> val randoms = env readTextFile config.randomFile map
>>>>> >> {Vector.parseFromString(_)}
>>>>> >>
>>>>> >> val residual_2 = residual * residual
>>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
>>>>> >>
>>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>>>>> >> val sumVector =
>>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>>>>> >> Array("id")) {
>>>>> >>     (solutionset, old_sum) =>
>>>>> >>     val current = residual_2 filter (new RichFilterFunction[Vector]{
>>>>> >>       def filter(x: Vector) = x.id ==
>>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>>>>> >>     })
>>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>>>>> >>
>>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>>>>> >>       def map(x: Vector) = new
>>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>>>>> >>     }),
>>>>> >>     sum)
>>>>> >> }
>>>>> >>
>>>>> >> Error:
>>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1)
>>>>> >> switched to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1)
>>>>> >> switched to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched to
RUNNING
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
Delta
>>>>> >> Iteration)) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
Delta
>>>>> >> Iteration)) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1)
>>>>> >> switched to
>>>>> >> RUNNING
>>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
Delta
>>>>> >> Iteration)) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:35: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to SCHEDULED
>>>>> >> 10/14/2014 15:57:35: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to DEPLOYING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
to
>>>>> >> SCHEDULED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
to
>>>>> >> DEPLOYING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to RUNNING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
to
>>>>> >> RUNNING
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) -> Filter
>>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched to FAILED
>>>>> >> java.lang.IllegalStateException: This stub is not part of an
>>>>> >> iteration
>>>>> >> step function.
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>> >>
>>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
>>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELING
>>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1)
>>>>> >> switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
Delta
>>>>> >> Iteration)) (1/1) switched to CANCELING
>>>>> >> 10/14/2014 15:57:36: Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S)
-
>>>>> >> UTF-8)
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
>>>>> >> Iteration))
>>>>> >> (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
to
>>>>> >> CANCELING
>>>>> >> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>>>>> >> (1/1)
>>>>> >> switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>>>>> >> (/tmp/tmpBhOsLd) -
>>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1)
>>>>> >> switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: CHAIN
>>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
->
>>>>> >> Map
>>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1) switched
to
>>>>> >> CANCELED
>>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
Delta
>>>>> >> Iteration)) (1/1) switched to CANCELED
>>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
>>>>> >> Error: The program execution failed:
>>>>> >> java.lang.IllegalStateException: This
>>>>> >> stub is not part of an iteration step function.
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>>>>> >> at bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>> >> at
>>>>> >>
>>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>> >>
>>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>>>>> >> <alber.maximilian@gmail.com> wrote:
>>>>> >>>
>>>>> >>> Should work now.
>>>>> >>> Cheers
>>>>> >>>
>>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>>>>> >>> <alber.maximilian@gmail.com> wrote:
>>>>> >>>>
>>>>> >>>> Ok, thanks.
>>>>> >>>> Please let me know when it is fixed.
>>>>> >>>>
>>>>> >>>> Cheers
>>>>> >>>> Max
>>>>> >>>>
>>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen <sewen@apache.org>
>>>>> >>>> wrote:
>>>>> >>>>>
>>>>> >>>>> Thank you, I will look into that...
>>>>> >>>>
>>>>> >>>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>
>>>>
>>>
>>
>

Mime
View raw message