flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Florian Heyl <f.h...@gmx.de>
Subject Re: data sink stops method
Date Thu, 08 Oct 2015 18:58:44 GMT
Hey Stephan and Pieter,
That was the same what I thought, so I simply changed the code like this:

original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)

env.execute()

transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

env.execute()
But he still not execute the two commands.
Thank you for your time.

Flo


Am 08.10.2015 um 17:41 schrieb Stephan Ewen <sewen@apache.org>:

> Yes, sinks in Flink are lazy and do not trigger execution automatically. We made this
choice to allow multiple concurrent sinks (spitting the streams and writing to many outputs
concurrently). That requires explicit execution triggers (env.execute()).
> 
> The exceptions are, as mentioned, the "eager" methods "collect()", "count()" and "print()".
They need to be eager, because the driver program needs for example the "count()" value before
it can possibly progress...
> 
> Stephan
> 
> 
> On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete <phameete@gmail.com> wrote:
> Hi Florian,
> 
> I believe that when you call JoinPredictionAndOriginal.collect the environment will execute
your program up until that point. The Csv writes are after this point, so in order to execute
these steps I think you would have to call <env>.execute() after the Csv writes to trigger
the execution (where <env> is the name of the variable pointing to your ExecutionEnvironment).
> 
> I hope this helps :-)
> 
> - Pieter
> 
> 2015-10-08 14:54 GMT+02:00 Florian Heyl <f.heyl@gmx.de>:
> Hi,
> I need some help to figure out why one method of mine in a pipeline stops the execution
on the hdfs.
> I am working with the 10.0-SNAPSHOT and the code is the following (see below). The method
stops on the hdfs by calling the collect method (JoinPredictionAndOriginal.collect) creating
a data sink, which is why the program stops before the two output files at the ends can be
created. What am I missing?
> Thank you for your time.
> 
> Best wishes,
> Flo
> 
> // method calculates the prediction error
> def CalcPredError(predictions: DataSet[LabeledVector], original: DataSet[LabeledVector],
>      outputPath: String, outputPath2: String, outputPath3: String): (DataSet[LabeledVector],
Double) ={
> 
>   var iter = 0
> 
>   val transformPred = predictions
>     .map { tuple =>
>     iter = iter + 1
>     LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, BigDecimal.RoundingMode.HALF_UP).toDouble))
>   }
> 
>   iter = 0
> 
>   val tranformOrg = original
>     .map { tuple =>
>     iter = iter + 1
>     LabeledVector(iter, DenseVector(tuple.label))
>   }
> 
>   val JoinPredictionAndOriginal = transformPred.join(tranformOrg).where(0).equalTo(0)
{
>     (l, r) => (l.vector.head._2, r.vector.head._2)
>   }
> 
>   val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect
> 
>   val N = list_JoinPredictionAndOriginal.length
> 
>   val residualSum = list_JoinPredictionAndOriginal.map {
>     num => pow((num._1 - num._2), 2)
>   }.sum
> 
>   val predictionError = sqrt(residualSum / N)
> 
>   original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
>   transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)
> 
>   (predictions,predictionError)
> }
> 
> 
> 
> 
>  
> 
> 


Mime
View raw message