flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Forced to use Solution Set in Step Function
Date Wed, 08 Oct 2014 09:39:00 GMT
Yes, you can refer to outside datasets in an iteration.

On Wed, Oct 8, 2014 at 10:37 AM, Maximilian Alber
<alber.maximilian@gmail.com> wrote:
> Just got into my mind: it possible to have broadcast sets inside the
> iteration functions with datasets which are "located" outside of it (via
> closure)?
>
> The basic type of my iteration is that I have a datasets which gets altered
> and is needed each iterations aka working set, in my case I have also a
> constant dataset which gets not modified (that messes up the code) and a
> resulting dataset which is not needed inside the step function.
> Thus similar to iterate with delta.
>
> Cheers,
> Max
>
>
> On Wed, Oct 8, 2014 at 10:26 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>>
>> Hmm, what it really needs is a different kind of iteration primitive.
>> Basically a bulk iteration where you can output values in each
>> iteration that get collected.
>>
>> On Wed, Oct 8, 2014 at 10:02 AM, Maximilian Alber
>> <alber.maximilian@gmail.com> wrote:
>> > Hi!
>> >
>> > Hmm I don't think so. I have two datasets, which I cannot really merge
>> > together. After some thinking this solution was the only I got for
>> > solving
>> > my problem:
>> > I have a DataSet with Vector(in this case just with length one) each has
>> > an
>> > id and an array with values. Out of that I would like to create the
>> > prefix
>> > sums aka the cumulative sums. To do it I need the to keep the dataset
>> > with
>> > the vectors and the dataset where I store the sums.
>> >
>> > In the Scala version I could use a dataset inside the iteration without
>> > passing as solution or workset just via closures?
>> >
>> > Maybe a flag to disable the check would be suitable?
>> >
>> > Thanks!
>> > Cheers,
>> > Max
>> >
>> > On Tue, Oct 7, 2014 at 4:34 PM, Stephan Ewen <sewen@apache.org> wrote:
>> >>
>> >> Hey!
>> >>
>> >> Is the algorithm you are using a delta iteration in fact. If you
>> >> actually
>> >> do not use the solution set, can you model it as a bulk-iteration?
>> >>
>> >> If you actually need the solution set to accumulate data, we can
>> >> probably
>> >> deactivate that check in the compiler. As far as I remember, there is
>> >> no
>> >> requirement in the runtime to join with the solution set. The check is
>> >> meant
>> >> to help programmers that forgot the join...
>> >>
>> >> Greetings,
>> >> Stephan
>> >>
>> >>
>> >>
>> >> On Tue, Oct 7, 2014 at 3:13 PM, Maximilian Alber
>> >> <alber.maximilian@gmail.com> wrote:
>> >>>
>> >>> Hi Flinksters!
>> >>>
>> >>> I would like to use iterateDelta function. I don't need the solution
>> >>> set
>> >>> inside the step function, because I generate a different values out
of
>> >>> the
>> >>> working set. Unfortunately the compiler of the development version
>> >>> doesn't
>> >>> like that. Is there a workaround?
>> >>>
>> >>> The code:
>> >>>
>> >>> val residual_2a = residual_2 union
>> >>> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>> >>> val emptyDataSet = env.fromCollection[Vector](Seq())
>> >>> val cumSum = emptyDataSet.iterateDelta(residual_2a, 1000000,
>> >>> Array("id"))
>> >>> {
>> >>>    (solutionset, workset) =>
>> >>>    val old_sum = workset filter {_.id == -1}
>> >>>    val current = workset filter (new RichFilterFunction[Vector]{
>> >>>      def filter(x: Vector) = x.id ==
>> >>> (getIterationRuntimeContext.getSuperstepNumber)
>> >>>      })
>> >>>    val residual_2 = workset filter {_.id != -1}
>> >>>    val sum = VectorDataSet.add(old_sum, current)
>> >>>
>> >>>    (sum map (new RichMapFunction[Vector, Vector]{
>> >>>      def map(x: Vector) = new
>> >>> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>> >>>     }),
>> >>>    residual_2 union sum)
>> >>> }
>> >>>
>> >>> The error:
>> >>>
>> >>> org.apache.flink.compiler.CompilerException: Error: The step function
>> >>> does not reference the solution set.
>> >>> at
>> >>>
>> >>> org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:868)
>> >>> at
>> >>>
>> >>> org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:622)
>> >>> at
>> >>>
>> >>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:283)
>> >>> at
>> >>>
>> >>> org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:202)
>> >>> at
>> >>>
>> >>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:286)
>> >>> at org.apache.flink.api.common.Plan.accept(Plan.java:281)
>> >>> at
>> >>> org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:517)
>> >>> at
>> >>> org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
>> >>> at
>> >>>
>> >>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
>> >>> at
>> >>>
>> >>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
>> >>> at org.apache.flink.client.program.Client.run(Client.java:285)
>> >>> at org.apache.flink.client.program.Client.run(Client.java:230)
>> >>> at
>> >>>
>> >>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>> >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>> >>> at
>> >>>
>> >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>> >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>> >>>
>> >>> Thanks!
>> >>> Cheers,
>> >>> Max
>> >>
>> >>
>> >
>
>

Mime
View raw message