flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Forced to use Solution Set in Step Function
Date Wed, 08 Oct 2014 08:26:22 GMT
Hmm, what it really needs is a different kind of iteration primitive.
Basically a bulk iteration where you can output values in each
iteration that get collected.

On Wed, Oct 8, 2014 at 10:02 AM, Maximilian Alber
<alber.maximilian@gmail.com> wrote:
> Hi!
>
> Hmm I don't think so. I have two datasets, which I cannot really merge
> together. After some thinking this solution was the only I got for solving
> my problem:
> I have a DataSet with Vector(in this case just with length one) each has an
> id and an array with values. Out of that I would like to create the prefix
> sums aka the cumulative sums. To do it I need the to keep the dataset with
> the vectors and the dataset where I store the sums.
>
> In the Scala version I could use a dataset inside the iteration without
> passing as solution or workset just via closures?
>
> Maybe a flag to disable the check would be suitable?
>
> Thanks!
> Cheers,
> Max
>
> On Tue, Oct 7, 2014 at 4:34 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>> Hey!
>>
>> Is the algorithm you are using a delta iteration in fact. If you actually
>> do not use the solution set, can you model it as a bulk-iteration?
>>
>> If you actually need the solution set to accumulate data, we can probably
>> deactivate that check in the compiler. As far as I remember, there is no
>> requirement in the runtime to join with the solution set. The check is meant
>> to help programmers that forgot the join...
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Tue, Oct 7, 2014 at 3:13 PM, Maximilian Alber
>> <alber.maximilian@gmail.com> wrote:
>>>
>>> Hi Flinksters!
>>>
>>> I would like to use iterateDelta function. I don't need the solution set
>>> inside the step function, because I generate a different values out of the
>>> working set. Unfortunately the compiler of the development version doesn't
>>> like that. Is there a workaround?
>>>
>>> The code:
>>>
>>> val residual_2a = residual_2 union
>>> env.fromCollection(Seq(Vector.zeros(config.dimensions)))
>>> val emptyDataSet = env.fromCollection[Vector](Seq())
>>> val cumSum = emptyDataSet.iterateDelta(residual_2a, 1000000, Array("id"))
>>> {
>>>    (solutionset, workset) =>
>>>    val old_sum = workset filter {_.id == -1}
>>>    val current = workset filter (new RichFilterFunction[Vector]{
>>>      def filter(x: Vector) = x.id ==
>>> (getIterationRuntimeContext.getSuperstepNumber)
>>>      })
>>>    val residual_2 = workset filter {_.id != -1}
>>>    val sum = VectorDataSet.add(old_sum, current)
>>>
>>>    (sum map (new RichMapFunction[Vector, Vector]{
>>>      def map(x: Vector) = new
>>> Vector(getIterationRuntimeContext.getSuperstepNumber, x.values)
>>>     }),
>>>    residual_2 union sum)
>>> }
>>>
>>> The error:
>>>
>>> org.apache.flink.compiler.CompilerException: Error: The step function
>>> does not reference the solution set.
>>> at
>>> org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:868)
>>> at
>>> org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:622)
>>> at
>>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:283)
>>> at
>>> org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:202)
>>> at
>>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:286)
>>> at org.apache.flink.api.common.Plan.accept(Plan.java:281)
>>> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:517)
>>> at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466)
>>> at
>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
>>> at
>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
>>> at org.apache.flink.client.program.Client.run(Client.java:285)
>>> at org.apache.flink.client.program.Client.run(Client.java:230)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>>
>>> Thanks!
>>> Cheers,
>>> Max
>>
>>
>

Mime
View raw message