flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: data sink stops method
Date Thu, 15 Oct 2015 10:24:24 GMT
Could you post a minimal example of your code where the problem is
reproducible? I assume that there has to be another problem because
env.execute should actually trigger the execution.

Cheers,

Till
‚Äč

On Thu, Oct 8, 2015 at 8:58 PM, Florian Heyl <f.heyl@gmx.de> wrote:

> Hey Stephan and Pieter,
> That was the same what I thought, so I simply changed the code like this:
>
> original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
>
> env.execute()
>
> transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)
>
> env.execute()
>
> But he still not execute the two commands.
> Thank you for your time.
>
> Flo
>
>
> Am 08.10.2015 um 17:41 schrieb Stephan Ewen <sewen@apache.org>:
>
> Yes, sinks in Flink are lazy and do not trigger execution automatically.
> We made this choice to allow multiple concurrent sinks (spitting the
> streams and writing to many outputs concurrently). That requires explicit
> execution triggers (env.execute()).
>
> The exceptions are, as mentioned, the "eager" methods "collect()",
> "count()" and "print()". They need to be eager, because the driver program
> needs for example the "count()" value before it can possibly progress...
>
> Stephan
>
>
> On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete <phameete@gmail.com> wrote:
>
>> Hi Florian,
>>
>> I believe that when you call *JoinPredictionAndOriginal.collect* the
>> environment will execute your program up until that point. The Csv writes
>> are after this point, so in order to execute these steps I think you would
>> have to call *<env>.execute()* after the Csv writes to trigger the
>> execution (where <env> is the name of the variable pointing to your
>> ExecutionEnvironment).
>>
>> I hope this helps :-)
>>
>> - Pieter
>>
>> 2015-10-08 14:54 GMT+02:00 Florian Heyl <f.heyl@gmx.de>:
>>
>>> Hi,
>>> I need some help to figure out why one method of mine in a pipeline
>>> stops the execution on the hdfs.
>>> I am working with the 10.0-SNAPSHOT and the code is the following (see
>>> below). The method stops on the hdfs by calling the collect method (
>>> JoinPredictionAndOriginal.collect) creating a data sink, which is why
>>> the program stops before the two output files at the ends can be created.
>>> What am I missing?
>>> Thank you for your time.
>>>
>>> Best wishes,
>>> Flo
>>>
>>> // method calculates the prediction error
>>> def CalcPredError(predictions: DataSet[LabeledVector], original: DataSet[LabeledVector],
>>>      outputPath: String, outputPath2: String, outputPath3: String): (DataSet[LabeledVector],
Double) ={
>>>
>>>   var iter = 0
>>>
>>>   val transformPred = predictions
>>>     .map { tuple =>
>>>     iter = iter + 1
>>>     LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, BigDecimal.RoundingMode.HALF_UP).toDouble))
>>>   }
>>>
>>>   iter = 0
>>>
>>>   val tranformOrg = original
>>>     .map { tuple =>
>>>     iter = iter + 1
>>>     LabeledVector(iter, DenseVector(tuple.label))
>>>   }
>>>
>>>
>>>   val JoinPredictionAndOriginal = transformPred.join(tranformOrg).where(0).equalTo(0)
{
>>>     (l, r) => (l.vector.head._2, r.vector.head._2)
>>>   }
>>>
>>>   val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect
>>>
>>>   val N = list_JoinPredictionAndOriginal.length
>>>
>>>   val residualSum = list_JoinPredictionAndOriginal.map {
>>>     num => pow((num._1 - num._2), 2)
>>>   }.sum
>>>
>>>   val predictionError = sqrt(residualSum / N)
>>>
>>>   original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
>>>   transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)
>>>
>>>   (predictions,predictionError)
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>

Mime
View raw message