# flink-user mailing list archives

##### Site index · List index
Message view
Top
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Re: Forced to use Solution Set in Step Function
Date Tue, 14 Oct 2014 15:29:58 GMT
```Ok, that's possible too.

VectorDataSet is just scala magic to ease my life (See below). If you want
to take a look, I appended the package. The main code is in
BumpBoost.scala. In util.scala is the vector stuff.
Thanks!

class VectorDataSet(X: DataSet[Vector]){
def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)

def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
def sumV() = VectorDataSet.sumV(X)
}
object VectorDataSet {
def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
x._1 + x._2}
def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
x._1 - x._2}
def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
x._1 * x._2}
def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
x._1 / x._2}

def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where "id"
equalTo "id"
def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}

implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
VectorDataSet(ds)
}

On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Maybe you could use the residual_2 data set as a broadcast dataset.
> i.e. make in available in the operation that adds the residual for the
> current iteration number to the old_sum. (I'm not sure what the
> VectorDataSet.add() method does here). If you gave me the complete
> code I could try finding an elegant solution to that problem.
>
> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <sewen@apache.org> wrote:
> > That is an interesting case. Everything that is loop invariant is
> computed
> > once outside the loop. You are looking for a way to make this part of the
> > loop.
> >
> > Can you try making the filter part of the "VectorDataSet.add(old_sum,
> > current)" operation?
> >
> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
> > <alber.maximilian@gmail.com> wrote:
> >>
> >> The deltaiteration calculates the cumulative sum (prefix sum) of
> >> residual_2.
> >>
> >> I'm not sure if got you idea. I could add residual_2 to the workset. But
> >> in that case I need to separate the "old_sum"(the sum up to now) and
> >> residual_2 each iteration. Actually I had that before, then I tried the
> >> Scala closure to clean up the code.
> >>
> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <fhueske@apache.org>
> wrote:
> >>>
> >>> Jep, I see you point.
> >>> Conceptually, all data that changes and affects the result of an
> >>> iteration should be part of the workset.
> >>> Hence, the model kind of assumes that the datum "superStepNumber"
> should
> >>> be part of the workset.
> >>>
> >>> I am not familiar with your application, but would it make sense to add
> >>> the number as an additional attribute to the workset data set and
> increase
> >>> it manually?
> >>>
> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber <
> alber.maximilian@gmail.com>:
> >>>>
> >>>> Ok, sounds true, but somehow I would like to execute it inside of it.
> So
> >>>> I probably need to do some nonsense work to make it part of it?
> >>>>
> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek <
> aljoscha@apache.org>
> >>>> wrote:
> >>>>>
> >>>>> Dammit you beat me to it. But yes, this is exactly what I was just
> >>>>> writing.
> >>>>>
> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske <fhueske@apache.org>
> >>>>> wrote:
> >>>>> > Hi,
> >>>>> >
> >>>>> > I'm not super familiar with the iterations, but my guess would
be
> >>>>> > that the
> >>>>> > filter is not evaluated as part of the iteration.
> >>>>> > Since it is not connect to the workset, the filter is not part
of
> the
> >>>>> > loop
> >>>>> > and evaluated once outside where no superset number is available.
> >>>>> > I guess, moving the filter outside of the loop gives the same
> error.
> >>>>> >
> >>>>> > Cheers, Fabian
> >>>>> >
> >>>>> >
> >>>>> >
> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
> >>>>> > <alber.maximilian@gmail.com>:
> >>>>> >>
> >>>>> >> Hmm or maybe not. With this code I get some strange error:
> >>>>> >>
> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment) =
{
> >>>>> >> val X = env readTextFile config.xFile map
> >>>>> >> {Vector.parseFromString(config.dimensions, _)};
> >>>>> >> val residual = env readTextFile config.yFile map
> >>>>> >> {Vector.parseFromString(_)};
> >>>>> >> val randoms = env readTextFile config.randomFile map
> >>>>> >> {Vector.parseFromString(_)}
> >>>>> >>
> >>>>> >> val residual_2 = residual * residual
> >>>>> >> val ys = (residual_2 sumV) * (randoms filter {_.id == 0})
> >>>>> >>
> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
> >>>>> >> val sumVector =
> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector, config.N,
> >>>>> >> Array("id")) {
> >>>>> >>     (solutionset, old_sum) =>
> >>>>> >>     val current = residual_2 filter (new
> RichFilterFunction[Vector]{
> >>>>> >>       def filter(x: Vector) = x.id ==
> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
> >>>>> >>     })
> >>>>> >>     val sum = VectorDataSet.add(old_sum, current)
> >>>>> >>
> >>>>> >>     (sum map (new RichMapFunction[Vector, Vector]{
> >>>>> >>       def map(x: Vector) = new
> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
> >>>>> >>     }),
> >>>>> >>     sum)
> >>>>> >> }
> >>>>> >>
> >>>>> >> Error:
> >>>>> >> 10/14/2014 15:57:35: Job execution switched to status RUNNING
> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched
to
> >>>>> >> SCHEDULED
> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched
to
> >>>>> >> DEPLOYING
> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
SCHEDULED
> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
DEPLOYING
> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
> >>>>> >> (/tmp/tmpBhOsLd) -
> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet\$\$anon\$1)
(1/1)
> >>>>> >> switched to
> >>>>> >> SCHEDULED
> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
> >>>>> >> (/tmp/tmpBhOsLd) -
> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet\$\$anon\$1)
(1/1)
> >>>>> >> switched to
> >>>>> >> DEPLOYING
> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0]) (1/1) switched
to
> RUNNING
> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
Delta
> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
Delta
> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource (TextInputFormat
> >>>>> >> (/tmp/tmpBhOsLd) -
> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet\$\$anon\$1)
(1/1)
> >>>>> >> switched to
> >>>>> >> RUNNING
> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1) switched to
RUNNING
> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration (Unnamed
Delta
> >>>>> >> Iteration)) (1/1) switched to RUNNING
> >>>>> >> 10/14/2014 15:57:35: CHAIN
> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation\$\$anon\$5)
> ->
> >>>>> >> Map
> >>>>> >> (org.apache.flink.api.scala.DataSet\$\$anon\$1) -> Filter
> >>>>> >> (bumpboost.BumpBoost\$\$anonfun\$8\$\$anon\$1) (1/1) switched
to
> SCHEDULED
> >>>>> >> 10/14/2014 15:57:35: CHAIN
> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation\$\$anon\$5)
> ->
> >>>>> >> Map
> >>>>> >> (org.apache.flink.api.scala.DataSet\$\$anon\$1) -> Filter
> >>>>> >> (bumpboost.BumpBoost\$\$anonfun\$8\$\$anon\$1) (1/1) switched
to
> DEPLOYING
> >>>>> >> 10/14/2014 15:57:36: CHAIN
> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation\$\$anon\$5)
> ->
> >>>>> >> Map
> >>>>> >> (org.apache.flink.api.scala.DataSet\$\$anon\$1) (1/1) switched
to
> >>>>> >> SCHEDULED
> >>>>> >> 10/14/2014 15:57:36: CHAIN
> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation\$\$anon\$5)
> ->
> >>>>> >> Map
> >>>>> >> (org.apache.flink.api.scala.DataSet\$\$anon\$1) (1/1) switched
to
> >>>>> >> DEPLOYING
> >>>>> >> 10/14/2014 15:57:36: CHAIN
> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation\$\$anon\$5)
> ->
> >>>>> >> Map
> >>>>> >> (org.apache.flink.api.scala.DataSet\$\$anon\$1) -> Filter
> >>>>> >> (bumpboost.BumpBoost\$\$anonfun\$8\$\$anon\$1) (1/1) switched
to RUNNING
> >>>>> >> 10/14/2014 15:57:36: CHAIN
> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation\$\$anon\$5)
> ->
> >>>>> >> Map
> >>>>> >> (org.apache.flink.api.scala.DataSet\$\$anon\$1) (1/1) switched
to
> >>>>> >> RUNNING
> >>>>> >> 10/14/2014 15:57:36: CHAIN
> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation\$\$anon\$5)
> ->
> >>>>> >> Map
> >>>>> >> (org.apache.flink.api.scala.DataSet\$\$anon\$1) -> Filter
> >>>>> >> (bumpboost.BumpBoost\$\$anonfun\$8\$\$anon\$1) (1/1) switched
to FAILED
> >>>>> >> java.lang.IllegalStateException: This stub is not part
of an
> >>>>> >> iteration
> >>>>> >> step function.
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> bumpboost.BumpBoost\$\$anonfun\$8\$\$anon\$1.filter(BumpBoost.scala:40)
> >>>>> >> at
> bumpboost.BumpBoost\$\$anonfun\$8\$\$anon\$1.filter(BumpBoost.scala:39)
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >>
> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILING
> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
CANCELING
> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
> >>>>> >> (/tmp/tmpBhOsLd) -
> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet\$\$anon\$1)
(1/1)
> >>>>> >> switched to
> >>>>> >> CANCELING
> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched
to
> >>>>> >> CANCELING
> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
Delta
> >>>>> >> Iteration)) (1/1) switched to CANCELING
> >>>>> >> 10/14/2014 15:57:36: Map
> >>>>> >> (org.apache.flink.api.scala.DataSet\$\$anon\$1)
> >>>>> >> (1/1) switched to CANCELED
> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat (/tmp/tmplSYJ7S)
-
> >>>>> >> UTF-8)
> >>>>> >> (1/1) switched to CANCELED
> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration (Unnamed Delta
> >>>>> >> Iteration))
> >>>>> >> (1/1) switched to CANCELED
> >>>>> >> 10/14/2014 15:57:36: CHAIN
> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation\$\$anon\$5)
> ->
> >>>>> >> Map
> >>>>> >> (org.apache.flink.api.scala.DataSet\$\$anon\$1) (1/1) switched
to
> >>>>> >> CANCELING
> >>>>> >> 10/14/2014 15:57:36: Map (bumpboost.BumpBoost\$\$anonfun\$8\$\$anon\$2)
> >>>>> >> (1/1)
> >>>>> >> switched to CANCELED
> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta (1/1) switched to
CANCELED
> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1) switched to
CANCELED
> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource (TextInputFormat
> >>>>> >> (/tmp/tmpBhOsLd) -
> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet\$\$anon\$1)
(1/1)
> >>>>> >> switched to
> >>>>> >> CANCELED
> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0]) (1/1) switched
to
> >>>>> >> CANCELED
> >>>>> >> 10/14/2014 15:57:36: CHAIN
> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation\$\$anon\$5)
> ->
> >>>>> >> Map
> >>>>> >> (org.apache.flink.api.scala.DataSet\$\$anon\$1) (1/1) switched
to
> >>>>> >> CANCELED
> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration (Unnamed
Delta
> >>>>> >> Iteration)) (1/1) switched to CANCELED
> >>>>> >> 10/14/2014 15:57:36: Job execution switched to status FAILED
> >>>>> >> Error: The program execution failed:
> >>>>> >> java.lang.IllegalStateException: This
> >>>>> >> stub is not part of an iteration step function.
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> bumpboost.BumpBoost\$\$anonfun\$8\$\$anon\$1.filter(BumpBoost.scala:40)
> >>>>> >> at
> bumpboost.BumpBoost\$\$anonfun\$8\$\$anon\$1.filter(BumpBoost.scala:39)
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >> at
> >>>>> >>
> >>>>> >>
> >>>>> >>
> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian Alber
> >>>>> >> <alber.maximilian@gmail.com> wrote:
> >>>>> >>>
> >>>>> >>> Should work now.
> >>>>> >>> Cheers
> >>>>> >>>
> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian Alber
> >>>>> >>> <alber.maximilian@gmail.com> wrote:
> >>>>> >>>>
> >>>>> >>>> Ok, thanks.
> >>>>> >>>> Please let me know when it is fixed.
> >>>>> >>>>
> >>>>> >>>> Cheers
> >>>>> >>>> Max
> >>>>> >>>>
> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM, Stephan Ewen <sewen@apache.org
> >
> >>>>> >>>> wrote:
> >>>>> >>>>>
> >>>>> >>>>> Thank you, I will look into that...
> >>>>> >>>>
> >>>>> >>>>
> >>>>> >>>
> >>>>> >>
> >>>>> >
> >>>>
> >>>>
> >>>
> >>
> >
>

```
Mime
View raw message