Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 67D7118181 for ; Thu, 8 Oct 2015 12:54:26 +0000 (UTC) Received: (qmail 91337 invoked by uid 500); 8 Oct 2015 12:54:26 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 91248 invoked by uid 500); 8 Oct 2015 12:54:26 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 91238 invoked by uid 99); 8 Oct 2015 12:54:26 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Oct 2015 12:54:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id AF74DC028F for ; Thu, 8 Oct 2015 12:54:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.298 X-Spam-Level: ** X-Spam-Status: No, score=2.298 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=3, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 4Yiz_KIf8t_5 for ; Thu, 8 Oct 2015 12:54:24 +0000 (UTC) Received: from mout.gmx.net (mout.gmx.net [212.227.17.20]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 4304B439D0 for ; Thu, 8 Oct 2015 12:54:24 +0000 (UTC) Received: from florianheylsmbp.fritz.box ([77.12.63.232]) by mail.gmx.com (mrgmx101) with ESMTPSA (Nemesis) id 0MUUWN-1aABjw0Sf5-00RLUF for ; Thu, 08 Oct 2015 14:54:23 +0200 From: Florian Heyl Content-Type: multipart/alternative; boundary="Apple-Mail=_CFBF5F22-9CB6-4A01-B277-CAAC8B86F126" Subject: data sink stops method Message-Id: <6E9EE61F-02C0-422D-A6C1-66388F51C01B@gmx.de> Date: Thu, 8 Oct 2015 14:54:22 +0200 To: user@flink.apache.org Mime-Version: 1.0 (Mac OS X Mail 7.3 \(1878.6\)) X-Mailer: Apple Mail (2.1878.6) X-Provags-ID: V03:K0:LKNYDlKOf34vyVs7Z9MnUdF0EkbRoYuzxdisxkLMIkDbIQmlXz2 LLGIiRugcLdAyO5P12Anie9MhlutR+VxF/g5Ei8O60Tk1aZLuDPbxJ0tEW1VuGBHIPNNzCH xvw79G3erFeU/xyB5CDi+ciyZjXKh5Il/13zRQBC4VCVtvLOjFZpSqgTT2Q4RdbrY/jdfTQ NulFmbR2KAA+oZTloFDig== X-UI-Out-Filterresults: notjunk:1;V01:K0:GZDMP2ANoEs=:gpnOlpvep1/uSmFKDOQ8LR HtdemFOZhr5f1Bmxbj23HU1use1KH6M257NZe6DpekRt/SC8Yb4DxpZS3dc7rrBHX/zkKWNID EXjH3bIHznX5WZHt5/vGDkxt5MHMslIXEKKmfY8DGsADTfedNGWImXJNLRV6TQb86DUTjFbgv aR9rnKrLKi90tDo8lVUWvlv7Nf+M86Ik9ZzrVianjqAbF0EJ8nREIK2VSLl3sbCEIF2pq5dwg mPUIC9zHsO+2VO7041Jlgg5Zh7xguOZVQExIcPWCP9wV3GmxecJ0SdH4BQd6gaNPcvQrrUWM8 l1OQq+7YJr6y5BwgZ0qEhfdwZT8wNuZSQlOKBYpPvC66TfdqExylak+ZvO3FckkBCHGGbPlGP Uf0cUbY11fF162w3NxhThXKRVFWR5GnWsizD0vgYuSKrOyvzQNcRMYQl0wvPZTfSf6kz1sVAY uCC6ZV5J0I6LX6T9GWsZdVOQgXLhR7T/w2EMWQfNVcLj/wLElQQqYzpMQYmjVWr1BTYvtrfsZ mgS0xZNSI1pmRLjcCafmzEie//SQen/vf2ArTSnn8vOPuV8qYce8Qd/BWmMBzsGVUtRXZNbPx dj5yfpDhCHivIx+AjhpZ5Q/gbLznx+WZFh9fXdBfHD4IKUIpQ8QGgfYN6FSYgxdQjXaciIsHa CYcSSx3ObOj1ovdgoX5R7DnwaHhBVv4lu67WkmCgAGWvJOQkVp32Eb5owleXHsIIax5H92uFD BVcfMsHzOu4JzCVeMFrxM2b+RXcq64LzcVUecZxcehinNQpIVOyG9rdsTY/uKXe1SaCY13Acj BKAFfRG --Apple-Mail=_CFBF5F22-9CB6-4A01-B277-CAAC8B86F126 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=us-ascii Hi, I need some help to figure out why one method of mine in a pipeline = stops the execution on the hdfs. I am working with the 10.0-SNAPSHOT and the code is the following (see = below). The method stops on the hdfs by calling the collect method = (JoinPredictionAndOriginal.collect) creating a data sink, which is why = the program stops before the two output files at the ends can be = created. What am I missing? Thank you for your time. Best wishes, Flo // method calculates the prediction error def CalcPredError(predictions: DataSet[LabeledVector], original: = DataSet[LabeledVector], outputPath: String, outputPath2: String, outputPath3: String): = (DataSet[LabeledVector], Double) =3D{ var iter =3D 0 val transformPred =3D predictions .map { tuple =3D> iter =3D iter + 1 LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, = BigDecimal.RoundingMode.HALF_UP).toDouble)) } iter =3D 0 val tranformOrg =3D original .map { tuple =3D> iter =3D iter + 1 LabeledVector(iter, DenseVector(tuple.label)) } val JoinPredictionAndOriginal =3D = transformPred.join(tranformOrg).where(0).equalTo(0) { (l, r) =3D> (l.vector.head._2, r.vector.head._2) } val list_JoinPredictionAndOriginal =3D = JoinPredictionAndOriginal.collect val N =3D list_JoinPredictionAndOriginal.length val residualSum =3D list_JoinPredictionAndOriginal.map { num =3D> pow((num._1 - num._2), 2) }.sum val predictionError =3D sqrt(residualSum / N) original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE) transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE) (predictions,predictionError) } =20= --Apple-Mail=_CFBF5F22-9CB6-4A01-B277-CAAC8B86F126 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=us-ascii Hi,
I need some help to figure out why one = method of mine in a pipeline stops the execution on the = hdfs.
I am working with the 10.0-SNAPSHOT and the code is the = following (see below). The method stops on the hdfs by calling the = collect method (JoinPredictionAndOriginal.collect) creating = a data sink, which is why the program stops before the two output files = at the ends can be created. What am I missing?
Thank you for = your time.

Best = wishes,
Flo

// method calculates the =
prediction error
def = CalcPredError(predictions: DataSet[LabeledVector], original: = DataSet[LabeledVector],
outputPath: String, outputPath2: String, outputPath3: String): (DataSet[LabeledVector], = Double) =3D{

var = iter =3D 0

val transformPred =3D = predictions
.map { tuple =3D>
iter =3D iter + 1
= LabeledVector(iter, = DenseVector(BigDecimal(tuple.label).= setScale(0, BigDecimal.RoundingMode.= HALF_UP).toDouble))
= }

iter =3D 0

val tranformOrg =3D = original
.map { tuple =3D>
iter =3D iter + 1
= LabeledVector(iter, = DenseVector(tuple.label))
= }

val = JoinPredictionAndOriginal =3D = transformPred.join(tranformOrg).where(0).equalTo(0) {
(l, r) =3D> = (l.vector.head._2, r.vector.head._2)
}

val = list_JoinPredictionAndOriginal =3D = JoinPredictionAndOriginal.collect

val N =3D = list_JoinPredictionAndOriginal.length

val residualSum =3D = list_JoinPredictionAndOriginal.map {
num =3D> pow((num._1 - num._2), 2)
}.sum

val predictionError =3D = sqrt(residualSum / N)

= original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
= transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

= (predictions,predictionError)
}




 
= --Apple-Mail=_CFBF5F22-9CB6-4A01-B277-CAAC8B86F126--