flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <till.rohrm...@gmail.com>
Subject Re: DistributedMatrix in Flink
Date Thu, 04 Feb 2016 16:22:45 GMT
Hi Lydia,

Spark and Flink are not identical. Thus, you’ll concepts in both system
which won’t have a corresponding counter part in the other system. For
example, rows.context.broadcast(v) broadcasts the value v so that you can
use it on all Executors. Flink follows a slightly different concept when
you broadcast values. In Flink you’ll always broadcast the contents of
DataSets. That way you avoid to collect the result on some central node
from which it is then broadcasted.

The treeAggregate is an aggregation operation which is partly executed on
the cluster. It is similar to a combinable reduce operation in Flink.
However, you can choose an arbitrary result type (similar to a fold
operation compared to a reduce operation). You can do the same with Flink
if you first apply a combineGroup function on the DataSet and then a reduce
function.

Cheers,
Till
​

On Thu, Feb 4, 2016 at 3:13 PM, Lydia Ickler <icklerly@googlemail.com>
wrote:

> Hi all,
>
> as mentioned before I am trying to import the RowMatrix from Spark to
> Flink…
>
> In the code I already ran into a dead end… In the function multiplyGramianMatrixBy()
> (see end of mail) there is the line:
> rows.context.broadcast(v) (rows is a DataSet[Vector]
> What exactly is this line doing? Does it fill the „content“ of v into the
> variable *rows*?
> And another question:
> What is the function treeAggregate doing ? And how would you tackle
> a „copy“ of that in Flink?
>
> Thanks in advance!
> Best regards,
> Lydia
>
>
> private[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]): DenseVector[Double]
= {
>   val n = numCols().toInt
>
>   val vbr = rows.context.broadcast(v)
>
>   rows.treeAggregate(BDV.zeros[Double](n))(
>     seqOp = (U, r) => {
>       val rBrz = r.toBreeze
>       val a = rBrz.dot(vbr.data)
>       rBrz match {
>         // use specialized axpy for better performance
>         case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U)
>         case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U)
>         case _ => throw new UnsupportedOperationException(
>           s"Do not support vector operation from type ${rBrz.getClass.getName}.")
>       }
>       U
>     }, combOp = (U1, U2) => U1 += U2)
> }
>
>
>

Mime
View raw message