flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Forced to use Solution Set in Step Function
Date Tue, 14 Oct 2014 20:40:28 GMT
Transferring to variables: Unfortunately not possible right now but we
are working on it.

On Tue, Oct 14, 2014 at 8:53 PM, Maximilian Alber
<alber.maximilian@gmail.com> wrote:
> @ Stephan: Thanks! So I gonna switch!
>
> Sorry, my bad. I will provide you some sample by tomorrow morning.
>
> Yes. Workaround, because I cannot transfer them into variables, can I by now
> (or will I ever)?
>
> Maybe some explanation to my solution:
> - X is for my a matrix of shape (N, d). Modeled in Flink as dataset of
> vectors. Each Vector has an ID which is the row number and an array with
> numbers, the actual row.
> - Y is for my a matrix of shape (N, 1) thus actually a column-vector.
> - old_sum is either a scalar if d == 1 or a row-vector aka matrix of shape
> (1, N) or a Dataset with one Vector. (By now I have the convention to give
> id -1 to them, comes from a former workaround...)
>
> The whole ID story comes from the fact that I need to know which stuff
> belongs together in mathematical operations (see my zip function). You can
> look that up in util.scala, that's kind of my math library. I don't want to
> imagine the mess in Java :)
>
> Cheers
> Max
>
>
>
> On Tue, Oct 14, 2014 at 6:28 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>>
>> Could you maybe also give some examples for the input expected by your
>> program?
>>
>> Also, the residual DataSet contains several Vectors while the sum (or
>> old_sum) DataSet is always only contains 1 Vector. Correct?
>>
>> On Tue, Oct 14, 2014 at 6:04 PM, Stephan Ewen <sewen@apache.org> wrote:
>> > BTW: The current master allows you to not join with the solution set,
>> > and
>> > only use it to accumulate data.
>> >
>> > On Tue, Oct 14, 2014 at 5:29 PM, Maximilian Alber
>> > <alber.maximilian@gmail.com> wrote:
>> >>
>> >> Ok, that's possible too.
>> >>
>> >> VectorDataSet is just scala magic to ease my life (See below). If you
>> >> want
>> >> to take a look, I appended the package. The main code is in
>> >> BumpBoost.scala.
>> >> In util.scala is the vector stuff.
>> >> Thanks!
>> >>
>> >> class VectorDataSet(X: DataSet[Vector]){
>> >> def + (Y: DataSet[Vector]) = VectorDataSet.add(X, Y)
>> >> def - (Y: DataSet[Vector]) = VectorDataSet.subt(X, Y)
>> >> def * (Y: DataSet[Vector]) = VectorDataSet.mult(X, Y)
>> >> def / (Y: DataSet[Vector]) = VectorDataSet.div(X, Y)
>> >>
>> >> def zip(Y: DataSet[Vector]) = VectorDataSet.zip(X, Y)
>> >> def dot(Y: DataSet[Vector]) = VectorDataSet.dot(X, Y)
>> >> def sumV() = VectorDataSet.sumV(X)
>> >> }
>> >> object VectorDataSet {
>> >> def add(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 + x._2}
>> >> def subt(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 - x._2}
>> >> def mult(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x
>> >> =>
>> >> x._1 * x._2}
>> >> def div(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 zip X2 map {x =>
>> >> x._1 / x._2}
>> >>
>> >> def zip(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 join X2 where
>> >> "id"
>> >> equalTo "id"
>> >> def dot(X1: DataSet[Vector], X2: DataSet[Vector]) = X1 * X2 sumV
>> >> def sumV(X1: DataSet[Vector]) = X1 reduce {_ + _}
>> >>
>> >> implicit def vectorDataSetWrapper(ds: DataSet[Vector]) = new
>> >> VectorDataSet(ds)
>> >> }
>> >>
>> >>
>> >> On Tue, Oct 14, 2014 at 5:21 PM, Aljoscha Krettek <aljoscha@apache.org>
>> >> wrote:
>> >>>
>> >>> Maybe you could use the residual_2 data set as a broadcast dataset.
>> >>> i.e. make in available in the operation that adds the residual for the
>> >>> current iteration number to the old_sum. (I'm not sure what the
>> >>> VectorDataSet.add() method does here). If you gave me the complete
>> >>> code I could try finding an elegant solution to that problem.
>> >>>
>> >>> On Tue, Oct 14, 2014 at 5:15 PM, Stephan Ewen <sewen@apache.org>
>> >>> wrote:
>> >>> > That is an interesting case. Everything that is loop invariant
is
>> >>> > computed
>> >>> > once outside the loop. You are looking for a way to make this part
>> >>> > of
>> >>> > the
>> >>> > loop.
>> >>> >
>> >>> > Can you try making the filter part of the
>> >>> > "VectorDataSet.add(old_sum,
>> >>> > current)" operation?
>> >>> >
>> >>> > On Tue, Oct 14, 2014 at 5:05 PM, Maximilian Alber
>> >>> > <alber.maximilian@gmail.com> wrote:
>> >>> >>
>> >>> >> The deltaiteration calculates the cumulative sum (prefix sum)
of
>> >>> >> residual_2.
>> >>> >>
>> >>> >> I'm not sure if got you idea. I could add residual_2 to the
>> >>> >> workset.
>> >>> >> But
>> >>> >> in that case I need to separate the "old_sum"(the sum up to
now)
>> >>> >> and
>> >>> >> residual_2 each iteration. Actually I had that before, then
I tried
>> >>> >> the
>> >>> >> Scala closure to clean up the code.
>> >>> >>
>> >>> >> On Tue, Oct 14, 2014 at 4:57 PM, Fabian Hueske <fhueske@apache.org>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Jep, I see you point.
>> >>> >>> Conceptually, all data that changes and affects the result
of an
>> >>> >>> iteration should be part of the workset.
>> >>> >>> Hence, the model kind of assumes that the datum "superStepNumber"
>> >>> >>> should
>> >>> >>> be part of the workset.
>> >>> >>>
>> >>> >>> I am not familiar with your application, but would it make
sense
>> >>> >>> to
>> >>> >>> add
>> >>> >>> the number as an additional attribute to the workset data
set and
>> >>> >>> increase
>> >>> >>> it manually?
>> >>> >>>
>> >>> >>> 2014-10-14 16:45 GMT+02:00 Maximilian Alber
>> >>> >>> <alber.maximilian@gmail.com>:
>> >>> >>>>
>> >>> >>>> Ok, sounds true, but somehow I would like to execute
it inside of
>> >>> >>>> it. So
>> >>> >>>> I probably need to do some nonsense work to make it
part of it?
>> >>> >>>>
>> >>> >>>> On Tue, Oct 14, 2014 at 4:36 PM, Aljoscha Krettek
>> >>> >>>> <aljoscha@apache.org>
>> >>> >>>> wrote:
>> >>> >>>>>
>> >>> >>>>> Dammit you beat me to it. But yes, this is exactly
what I was
>> >>> >>>>> just
>> >>> >>>>> writing.
>> >>> >>>>>
>> >>> >>>>> On Tue, Oct 14, 2014 at 4:35 PM, Fabian Hueske
>> >>> >>>>> <fhueske@apache.org>
>> >>> >>>>> wrote:
>> >>> >>>>> > Hi,
>> >>> >>>>> >
>> >>> >>>>> > I'm not super familiar with the iterations,
but my guess would
>> >>> >>>>> > be
>> >>> >>>>> > that the
>> >>> >>>>> > filter is not evaluated as part of the iteration.
>> >>> >>>>> > Since it is not connect to the workset, the
filter is not part
>> >>> >>>>> > of
>> >>> >>>>> > the
>> >>> >>>>> > loop
>> >>> >>>>> > and evaluated once outside where no superset
number is
>> >>> >>>>> > available.
>> >>> >>>>> > I guess, moving the filter outside of the
loop gives the same
>> >>> >>>>> > error.
>> >>> >>>>> >
>> >>> >>>>> > Cheers, Fabian
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> >
>> >>> >>>>> > 2014-10-14 16:18 GMT+02:00 Maximilian Alber
>> >>> >>>>> > <alber.maximilian@gmail.com>:
>> >>> >>>>> >>
>> >>> >>>>> >> Hmm or maybe not. With this code I get
some strange error:
>> >>> >>>>> >>
>> >>> >>>>> >> def createPlan_find_center(env: ExecutionEnvironment)
= {
>> >>> >>>>> >> val X = env readTextFile config.xFile
map
>> >>> >>>>> >> {Vector.parseFromString(config.dimensions,
_)};
>> >>> >>>>> >> val residual = env readTextFile config.yFile
map
>> >>> >>>>> >> {Vector.parseFromString(_)};
>> >>> >>>>> >> val randoms = env readTextFile config.randomFile
map
>> >>> >>>>> >> {Vector.parseFromString(_)}
>> >>> >>>>> >>
>> >>> >>>>> >> val residual_2 = residual * residual
>> >>> >>>>> >> val ys = (residual_2 sumV) * (randoms
filter {_.id == 0})
>> >>> >>>>> >>
>> >>> >>>>> >> val emptyDataSet = env.fromCollection[Vector](Seq())
>> >>> >>>>> >> val sumVector =
>> >>> >>>>> >> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> >>> >>>>> >> val cumSum = emptyDataSet.iterateDelta(sumVector,
config.N,
>> >>> >>>>> >> Array("id")) {
>> >>> >>>>> >>     (solutionset, old_sum) =>
>> >>> >>>>> >>     val current = residual_2 filter (new
>> >>> >>>>> >> RichFilterFunction[Vector]{
>> >>> >>>>> >>       def filter(x: Vector) = x.id ==
>> >>> >>>>> >> (getIterationRuntimeContext.getSuperstepNumber)
>> >>> >>>>> >>     })
>> >>> >>>>> >>     val sum = VectorDataSet.add(old_sum,
current)
>> >>> >>>>> >>
>> >>> >>>>> >>     (sum map (new RichMapFunction[Vector,
Vector]{
>> >>> >>>>> >>       def map(x: Vector) = new
>> >>> >>>>> >> Vector(getIterationRuntimeContext.getSuperstepNumber,
>> >>> >>>>> >> x.values)
>> >>> >>>>> >>     }),
>> >>> >>>>> >>     sum)
>> >>> >>>>> >> }
>> >>> >>>>> >>
>> >>> >>>>> >> Error:
>> >>> >>>>> >> 10/14/2014 15:57:35: Job execution switched
to status RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0])
(1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0])
(1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1)
switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1)
switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource
(TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource
(TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([-1 0.0])
(1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
(Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
(Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN DataSource
(TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: DataSource ([]) (1/1)
switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: IterationHead(WorksetIteration
(Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
-> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1)
(1/1) switched to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:35: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
-> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1)
(1/1) switched to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> SCHEDULED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> DEPLOYING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
-> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1)
(1/1) switched to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> RUNNING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
-> Filter
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$1)
(1/1) switched to
>> >>> >>>>> >> FAILED
>> >>> >>>>> >> java.lang.IllegalStateException: This
stub is not part of an
>> >>> >>>>> >> iteration
>> >>> >>>>> >> step function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched
to status FAILING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1)
switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource
(TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0])
(1/1) switched to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration
(Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSink(TextOutputFormat
>> >>> >>>>> >> (/tmp/tmplSYJ7S)
>> >>> >>>>> >> -
>> >>> >>>>> >> UTF-8)
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Sync (WorksetIteration
(Unnamed Delta
>> >>> >>>>> >> Iteration))
>> >>> >>>>> >> (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELING
>> >>> >>>>> >> 10/14/2014 15:57:36: Map
>> >>> >>>>> >> (bumpboost.BumpBoost$$anonfun$8$$anon$2)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: SolutionSet Delta
(1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([]) (1/1)
switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN DataSource
(TextInputFormat
>> >>> >>>>> >> (/tmp/tmpBhOsLd) -
>> >>> >>>>> >> UTF-8) -> Map (org.apache.flink.api.scala.DataSet$$anon$1)
>> >>> >>>>> >> (1/1)
>> >>> >>>>> >> switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: DataSource ([-1 0.0])
(1/1) switched to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: CHAIN
>> >>> >>>>> >>
>> >>> >>>>> >> Join(org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5)
>> >>> >>>>> >> ->
>> >>> >>>>> >> Map
>> >>> >>>>> >> (org.apache.flink.api.scala.DataSet$$anon$1)
(1/1) switched
>> >>> >>>>> >> to
>> >>> >>>>> >> CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: IterationHead(WorksetIteration
(Unnamed
>> >>> >>>>> >> Delta
>> >>> >>>>> >> Iteration)) (1/1) switched to CANCELED
>> >>> >>>>> >> 10/14/2014 15:57:36: Job execution switched
to status FAILED
>> >>> >>>>> >> Error: The program execution failed:
>> >>> >>>>> >> java.lang.IllegalStateException: This
>> >>> >>>>> >> stub is not part of an iteration step
function.
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:40)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >> bumpboost.BumpBoost$$anonfun$8$$anon$1.filter(BumpBoost.scala:39)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.api.scala.UnfinishedJoinOperation$$anon$5.join(joinDataSet.scala:184)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:140)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>> >>> >>>>> >> at
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >>
>> >>> >>>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>> >>> >>>>> >> at java.lang.Thread.run(Thread.java:745)
>> >>> >>>>> >>
>> >>> >>>>> >> On Tue, Oct 14, 2014 at 3:32 PM, Maximilian
Alber
>> >>> >>>>> >> <alber.maximilian@gmail.com> wrote:
>> >>> >>>>> >>>
>> >>> >>>>> >>> Should work now.
>> >>> >>>>> >>> Cheers
>> >>> >>>>> >>>
>> >>> >>>>> >>> On Fri, Oct 10, 2014 at 3:38 PM, Maximilian
Alber
>> >>> >>>>> >>> <alber.maximilian@gmail.com>
wrote:
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Ok, thanks.
>> >>> >>>>> >>>> Please let me know when it is
fixed.
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> Cheers
>> >>> >>>>> >>>> Max
>> >>> >>>>> >>>>
>> >>> >>>>> >>>> On Fri, Oct 10, 2014 at 1:34 PM,
Stephan Ewen
>> >>> >>>>> >>>> <sewen@apache.org>
>> >>> >>>>> >>>> wrote:
>> >>> >>>>> >>>>>
>> >>> >>>>> >>>>> Thank you, I will look into
that...
>> >>> >>>>> >>>>
>> >>> >>>>> >>>>
>> >>> >>>>> >>>
>> >>> >>>>> >>
>> >>> >>>>> >
>> >>> >>>>
>> >>> >>>>
>> >>> >>>
>> >>> >>
>> >>> >
>> >>
>> >>
>> >
>
>

Mime
View raw message