flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lydia Ickler <ickle...@googlemail.com>
Subject DistributedMatrix in Flink
Date Thu, 04 Feb 2016 14:13:58 GMT
Hi all,

as mentioned before I am trying to import the RowMatrix from Spark to Flink…

In the code I already ran into a dead end… In the function multiplyGramianMatrixBy() (see
end of mail) there is the line: 
rows.context.broadcast(v) (rows is a DataSet[Vector]
What exactly is this line doing? Does it fill the „content“ of v into the variable rows?
And another question:
What is the function treeAggregate doing ? And how would you tackle a „copy“ of that in
Flink?

Thanks in advance!
Best regards, 
Lydia


private[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]): DenseVector[Double] =
{
  val n = numCols().toInt

  val vbr = rows.context.broadcast(v)

  rows.treeAggregate(BDV.zeros[Double](n))(
    seqOp = (U, r) => {
      val rBrz = r.toBreeze
      val a = rBrz.dot(vbr.data)
      rBrz match {
        // use specialized axpy for better performance
        case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U)
        case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U)
        case _ => throw new UnsupportedOperationException(
          s"Do not support vector operation from type ${rBrz.getClass.getName}.")
      }
      U
    }, combOp = (U1, U2) => U1 += U2)
}


Mime
View raw message