flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Forced to use Solution Set in Step Function
Date Tue, 14 Oct 2014 16:04:01 GMT
BTW: The current master allows you to not join with the solution set, and
only use it to accumulate data.

On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Ok, that's possible too.
>
> VectorDataSet is just scala magic to ease my life (See below). If you want
> to take a look, I appended the package. The main code is in
> BumpBoost.scala. In util.scala is the vector stuff.
> Thanks!
>
> class VectorDataSet(X: DataSet[Vector]){
> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>
> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
> def sumV() = VectorDataSet.sumV(X)
> }
> object VectorDataSet {
> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
> x._1 + x._2}
> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
> x._1 - x._2}
> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
> x._1 * x._2}
> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
> x._1 / x._2}
>
> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where "id"
> equalTo "id"
> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>
> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
> VectorDataSet(ds)
> }
>
>
> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Maybe you could use the residual_2 data set as a broadcast dataset.
>> i.e. make in available in the operation that adds the residual for the
>> current iteration number to the old_sum. (I'm not sure what the
>> VectorDataSet.add() method does here). If you gave me the complete
>> code I could try finding an elegant solution to that problem.
>>
>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <sewen@apache.org> wrote:
>> > That is an interesting case. Everything that is loop invariant is
>> computed
>> > once outside the loop. You are looking for a way to make this part of
>> the
>> > loop.
>> >
>> > Can you try making the filter part of the "VectorDataSet.add(old_sum,
>> > current)" operation?
>> >
>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>> > <alber.maximilian@gmail.com> wrote:
>> >>
>> >> The deltaiteration calculates the cumulative sum (prefix sum) of
>> >> residual_2.
>> >>
>> >> I'm not sure if got you idea. I could add residual_2 to the workset.
>> But
>> >> in that case I need to separate the "old_sum"(the sum up to now) and
>> >> residual_2 each iteration. Actually I had that before, then I tried the
>> >> Scala closure to clean up the code.
>> >>
>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <fhueske@apache.org>
>> wrote:
>> >>>
>> >>> Jep, I see you point.
>> >>> Conceptually, all data that changes and affects the result of an
>> >>> iteration should be part of the workset.
>> >>> Hence, the model kind of assumes that the datum "superStepNumber"
>> should
>> >>> be part of the workset.
>> >>>
>> >>> I am not familiar with your application, but would it make sense to
>> add
>> >>> the number as an additional attribute to the workset data set and
>> increase
>> >>> it manually?
>> >>>
>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber <
>> alber.maximilian@gmail.com>:
>> >>>>
>> >>>> Ok, sounds true, but somehow I would like to execute it inside of
>> it. So
>> >>>> I probably need to do some nonsense work to make it part of it?
>> >>>>
>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek <
>> aljoscha@apache.org>
>> >>>> wrote:
>> >>>>>
>> >>>>> Dammit you beat me to it. But yes, this is exactly what I was
just
>> >>>>> writing.
>> >>>>>
>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <fhueske@apache.org>
>> >>>>> wrote:
>> >>>>> > Hi,
>> >>>>> >
>> >>>>> > I'm not super familiar with the iterations, but my guess
would be
>> >>>>> > that the
>> >>>>> > filter is not evaluated as part of the iteration.
>> >>>>> > Since it is not connect to the workset, the filter is not
part of
>> the
>> >>>>> > loop
>> >>>>> > and evaluated once outside where no superset number is
available.
>> >>>>> > I guess, moving the filter outside of the loop gives the
same
>> error.
>> >>>>> >
>> >>>>> > Cheers, Fabian
>> >>>>> >
>> >>>>> >
>> >>>>> >
>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>> >>>>> > <alber.maximilian@gmail.com>:
>> >>>>> >>
>> >>>>> >> Hmm or maybe not. With this code I get some strange
error:
>> >>>>> >>
>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment)
= {
>> >>>>> >> val X = env readTextFile config.xFile map
>> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
>> >>>>> >> val residual = env readTextFile config.yFile map
>> >>>>> >> {Vector.parseFromString(_)};
>> >>>>> >> val randoms = env readTextFile config.randomFile map
>> >>>>> >> {Vector.parseFromString(_)}
>> >>>>> >>
>> >>>>> >> val residual_2 = residual * residual
>> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id
== 0})
>> >>>>> >>
>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>> >>>>> >> val sumVector =
>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
>> >>>>> >> Array("id")) {
>> >>>>> >>     (solutionset, old_sum) =>
>> >>>>> >>     val current = residual_2 filter (new
>> RichFilterFunction[Vector]{
>> >>>>> >>       def filter(x: Vector) = x.id ==
>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>> >>>>> >>     })
>> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
>> >>>>> >>
>> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
>> >>>>> >>       def map(x: Vector) = new
>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
x.values)
>> >>>>> >>     }),
>> >>>>> >>     sum)
>> >>>>> >> }
>> >>>>> >>
>> >>>>> >> Error:
>> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status
RUNNING
>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched
to
>> >>>>> >> SCHEDULED
>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched
to
>> >>>>> >> DEPLOYING
>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched
to SCHEDULED
>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched
to DEPLOYING
>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1)
>> >>>>> >> switched to
>> >>>>> >> SCHEDULED
>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1)
>> >>>>> >> switched to
>> >>>>> >> DEPLOYING
>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched
to
>> RUNNING
>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
(Unnamed
>> Delta
>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
(Unnamed
>> Delta
>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1)
>> >>>>> >> switched to
>> >>>>> >> RUNNING
>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched
to RUNNING
>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
(Unnamed
>> Delta
>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> ->
>> >>>>> >> Map
>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) ->
Filter
>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
to
>> SCHEDULED
>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> ->
>> >>>>> >> Map
>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) ->
Filter
>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
to
>> DEPLOYING
>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> ->
>> >>>>> >> Map
>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
switched to
>> >>>>> >> SCHEDULED
>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> ->
>> >>>>> >> Map
>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
switched to
>> >>>>> >> DEPLOYING
>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> ->
>> >>>>> >> Map
>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) ->
Filter
>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
to
>> RUNNING
>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> ->
>> >>>>> >> Map
>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
switched to
>> >>>>> >> RUNNING
>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> ->
>> >>>>> >> Map
>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) ->
Filter
>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1) (1/1) switched
to FAILED
>> >>>>> >> java.lang.IllegalStateException: This stub is not part
of an
>> >>>>> >> iteration
>> >>>>> >> step function.
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>>>> >> at
>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>>>> >> at
>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>>>> >>
>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status
FAILING
>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched
to CANCELING
>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1)
>> >>>>> >> switched to
>> >>>>> >> CANCELING
>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched
to
>> >>>>> >> CANCELING
>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration
(Unnamed
>> Delta
>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>>>> >> (1/1) switched to CANCELED
>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S)
-
>> >>>>> >> UTF-8)
>> >>>>> >> (1/1) switched to CANCELED
>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed
Delta
>> >>>>> >> Iteration))
>> >>>>> >> (1/1) switched to CANCELED
>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> ->
>> >>>>> >> Map
>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
switched to
>> >>>>> >> CANCELING
>> >>>>> >> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>> >>>>> >> (1/1)
>> >>>>> >> switched to CANCELED
>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched
to CANCELED
>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched
to CANCELED
>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1)
>> >>>>> >> switched to
>> >>>>> >> CANCELED
>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched
to
>> >>>>> >> CANCELED
>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> ->
>> >>>>> >> Map
>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1) (1/1)
switched to
>> >>>>> >> CANCELED
>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration
(Unnamed
>> Delta
>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status
FAILED
>> >>>>> >> Error: The program execution failed:
>> >>>>> >> java.lang.IllegalStateException: This
>> >>>>> >> stub is not part of an iteration step function.
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>>>> >> at
>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>>>> >> at
>> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>>>> >> at
>> >>>>> >>
>> >>>>> >>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>>>> >>
>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
>> >>>>> >> <alber.maximilian@gmail.com> wrote:
>> >>>>> >>>
>> >>>>> >>> Should work now.
>> >>>>> >>> Cheers
>> >>>>> >>>
>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
>> >>>>> >>> <alber.maximilian@gmail.com> wrote:
>> >>>>> >>>>
>> >>>>> >>>> Ok, thanks.
>> >>>>> >>>> Please let me know when it is fixed.
>> >>>>> >>>>
>> >>>>> >>>> Cheers
>> >>>>> >>>> Max
>> >>>>> >>>>
>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen
<
>> sewen@apache.org>
>> >>>>> >>>> wrote:
>> >>>>> >>>>>
>> >>>>> >>>>> Thank you, I will look into that...
>> >>>>> >>>>
>> >>>>> >>>>
>> >>>>> >>>
>> >>>>> >>
>> >>>>> >
>> >>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>
>

Mime
View raw message