Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 612CA175C5 for ; Thu, 15 Oct 2015 10:24:26 +0000 (UTC) Received: (qmail 69873 invoked by uid 500); 15 Oct 2015 10:24:26 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 69790 invoked by uid 500); 15 Oct 2015 10:24:26 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 69779 invoked by uid 99); 15 Oct 2015 10:24:26 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Oct 2015 10:24:26 +0000 Received: from mail-wi0-f174.google.com (mail-wi0-f174.google.com [209.85.212.174]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 79DD21A0182 for ; Thu, 15 Oct 2015 10:24:25 +0000 (UTC) Received: by wicgb1 with SMTP id gb1so21486692wic.1 for ; Thu, 15 Oct 2015 03:24:24 -0700 (PDT) MIME-Version: 1.0 X-Received: by 10.180.210.234 with SMTP id mx10mr10090246wic.31.1444904664175; Thu, 15 Oct 2015 03:24:24 -0700 (PDT) Received: by 10.28.139.69 with HTTP; Thu, 15 Oct 2015 03:24:24 -0700 (PDT) In-Reply-To: <4DFE7197-37E1-4567-B1AF-AE55FB4F42DD@gmx.de> References: <6E9EE61F-02C0-422D-A6C1-66388F51C01B@gmx.de> <4DFE7197-37E1-4567-B1AF-AE55FB4F42DD@gmx.de> Date: Thu, 15 Oct 2015 12:24:24 +0200 Message-ID: Subject: Re: data sink stops method From: Till Rohrmann To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11c38d203e1e4e052222155f --001a11c38d203e1e4e052222155f Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Could you post a minimal example of your code where the problem is reproducible? I assume that there has to be another problem because env.execute should actually trigger the execution. Cheers, Till =E2=80=8B On Thu, Oct 8, 2015 at 8:58 PM, Florian Heyl wrote: > Hey Stephan and Pieter, > That was the same what I thought, so I simply changed the code like this: > > original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE) > > env.execute() > > transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE) > > env.execute() > > But he still not execute the two commands. > Thank you for your time. > > Flo > > > Am 08.10.2015 um 17:41 schrieb Stephan Ewen : > > Yes, sinks in Flink are lazy and do not trigger execution automatically. > We made this choice to allow multiple concurrent sinks (spitting the > streams and writing to many outputs concurrently). That requires explicit > execution triggers (env.execute()). > > The exceptions are, as mentioned, the "eager" methods "collect()", > "count()" and "print()". They need to be eager, because the driver progra= m > needs for example the "count()" value before it can possibly progress... > > Stephan > > > On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete wrote= : > >> Hi Florian, >> >> I believe that when you call *JoinPredictionAndOriginal.collect* the >> environment will execute your program up until that point. The Csv write= s >> are after this point, so in order to execute these steps I think you wou= ld >> have to call *.execute()* after the Csv writes to trigger the >> execution (where is the name of the variable pointing to your >> ExecutionEnvironment). >> >> I hope this helps :-) >> >> - Pieter >> >> 2015-10-08 14:54 GMT+02:00 Florian Heyl : >> >>> Hi, >>> I need some help to figure out why one method of mine in a pipeline >>> stops the execution on the hdfs. >>> I am working with the 10.0-SNAPSHOT and the code is the following (see >>> below). The method stops on the hdfs by calling the collect method ( >>> JoinPredictionAndOriginal.collect) creating a data sink, which is why >>> the program stops before the two output files at the ends can be create= d. >>> What am I missing? >>> Thank you for your time. >>> >>> Best wishes, >>> Flo >>> >>> // method calculates the prediction error >>> def CalcPredError(predictions: DataSet[LabeledVector], original: DataSe= t[LabeledVector], >>> outputPath: String, outputPath2: String, outputPath3: String): (Da= taSet[LabeledVector], Double) =3D{ >>> >>> var iter =3D 0 >>> >>> val transformPred =3D predictions >>> .map { tuple =3D> >>> iter =3D iter + 1 >>> LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0,= BigDecimal.RoundingMode.HALF_UP).toDouble)) >>> } >>> >>> iter =3D 0 >>> >>> val tranformOrg =3D original >>> .map { tuple =3D> >>> iter =3D iter + 1 >>> LabeledVector(iter, DenseVector(tuple.label)) >>> } >>> >>> >>> val JoinPredictionAndOriginal =3D transformPred.join(tranformOrg).whe= re(0).equalTo(0) { >>> (l, r) =3D> (l.vector.head._2, r.vector.head._2) >>> } >>> >>> val list_JoinPredictionAndOriginal =3D JoinPredictionAndOriginal.coll= ect >>> >>> val N =3D list_JoinPredictionAndOriginal.length >>> >>> val residualSum =3D list_JoinPredictionAndOriginal.map { >>> num =3D> pow((num._1 - num._2), 2) >>> }.sum >>> >>> val predictionError =3D sqrt(residualSum / N) >>> >>> original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE) >>> transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE) >>> >>> (predictions,predictionError) >>> } >>> >>> >>> >>> >>> >>> >>> >> >> > > --001a11c38d203e1e4e052222155f Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Could you post a minimal example of you= r code where the problem is reproducible? I assume that there has to be ano= ther problem because env.execute should actually tr= igger the execution.

Cheers,=

Till

=E2=80=8B

On Thu, O= ct 8, 2015 at 8:58 PM, Florian Heyl <f.heyl@gmx.de> wrote:
Hey S= tephan and Pieter,
That was the same what I thought, so I simply = changed the code like this:

original.writ=
eAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)

env.execute()

transformPred.writeAsCsv(outputPath2, "\n"= ;, " ", WriteMode.OVERWRITE)

env.execute()
But he still not execute the two= commands.
Thank you for your time.
<= div>
Flo

Am 08.10.2015 um 17:41 schrieb Stephan Ewen <sewen@apache.org>:

<= blockquote type=3D"cite">
Yes, sinks in Flink are lazy and = do not trigger execution automatically. We made this choice to allow multip= le concurrent sinks (spitting the streams and writing to many outputs concu= rrently). That requires explicit execution triggers (env.execute()).
The exceptions are, as mentioned, the "eager" method= s "collect()", "count()" and "print()". They = need to be eager, because the driver program needs for example the "co= unt()" value before it can possibly progress...

Stephan


On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete <phame= ete@gmail.com> wrote:
Hi Florian,

I believe that when you call=C2=A0JoinPredictionAndOriginal.= collect the environment will execute your program up until that point. = The Csv writes are after this point, so in order to execute these steps I t= hink you would have to call <env>.execute()=C2=A0after the Csv= writes to trigger the execution (where <env> is the name of the vari= able pointing to your ExecutionEnvironment).

I hope this helps :-)

- Pieter
<= /font>

2015-10-08 14:54 GMT+02:00 Florian Heyl <<= a href=3D"mailto:f.heyl@gmx.de" target=3D"_blank">f.heyl@gmx.de>:
= Hi,
I need some help to figure out why one method of mine in a pipeline= stops the execution on the hdfs.
I am working with the 10.0-SNAP= SHOT and the code is the following (see below). The method stops on the hdf= s by calling the collect method (JoinPredictionAndOriginal.collect) creatin= g a data sink, which is why the program stops before the two output files a= t the ends can be created. What am I missing?
Thank you for your = time.

Best wishes,
Flo

=
// method calculates the pr=
ediction error
def= CalcPredError(predictions: DataSet[LabeledVector], original: DataSe= t[LabeledVector],
outputPath: String<= /span>, outputPath2: String, outputPat= h3: String): (DataSet[LabeledVector], = Double) =3D{

var iter =3D 0

val transformPred = =3D predictions
.map { tuple =3D>
iter =3D iter + 1
= LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, BigDecimal= .RoundingMode.HALF_U= P).toDouble))
}

iter =3D 0

val tranformOrg =3D original
.map {= tuple =3D>
iter =3D iter + 1
LabeledVector(iter, DenseVect= or(tuple.label))
}

val Joi= nPredictionAndOriginal =3D transformPred.join(tranformOrg).where(0).equalTo(0) {
(l, r) =3D> (l.vector.head._2, r.vector.head._2)
}
<= br> val list_JoinPredictionAndOriginal = =3D JoinPredictionAndOriginal.collect

v= al N =3D list_JoinPredictionAndOriginal.length

val residualSum =3D list_JoinPredictionAndOriginal.ma= p {
num =3D> pow((num._1= - num._2), 2)
}.sum

val predictionError =3D sqrt(residualSum / N)

original.writeAsCsv(outputPath, "\n", " ", WriteMode= .OVERWRITE)
tra= nsformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

= (predictions,predictionError)
}




=C2=A0




--001a11c38d203e1e4e052222155f--