flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Florian Heyl <f.h...@gmx.de>
Subject data sink stops method
Date Thu, 08 Oct 2015 12:54:22 GMT
Hi,
I need some help to figure out why one method of mine in a pipeline stops the execution on
the hdfs.
I am working with the 10.0-SNAPSHOT and the code is the following (see below). The method
stops on the hdfs by calling the collect method (JoinPredictionAndOriginal.collect) creating
a data sink, which is why the program stops before the two output files at the ends can be
created. What am I missing?
Thank you for your time.

Best wishes,
Flo

// method calculates the prediction error
def CalcPredError(predictions: DataSet[LabeledVector], original: DataSet[LabeledVector],
     outputPath: String, outputPath2: String, outputPath3: String): (DataSet[LabeledVector],
Double) ={

  var iter = 0

  val transformPred = predictions
    .map { tuple =>
    iter = iter + 1
    LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, BigDecimal.RoundingMode.HALF_UP).toDouble))
  }

  iter = 0

  val tranformOrg = original
    .map { tuple =>
    iter = iter + 1
    LabeledVector(iter, DenseVector(tuple.label))
  }

  val JoinPredictionAndOriginal = transformPred.join(tranformOrg).where(0).equalTo(0) {
    (l, r) => (l.vector.head._2, r.vector.head._2)
  }

  val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect

  val N = list_JoinPredictionAndOriginal.length

  val residualSum = list_JoinPredictionAndOriginal.map {
    num => pow((num._1 - num._2), 2)
  }.sum

  val predictionError = sqrt(residualSum / N)

  original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
  transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

  (predictions,predictionError)
}




 
Mime
View raw message