spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ulanov, Alexander" <>
Subject RE: BlockMatrix multiplication
Date Wed, 15 Jul 2015 01:23:40 GMT
Hi Burak,

Thank you for explanation! I will try to make a diagonal block matrix and report you the results.

Column- or row based partitioner make sense to me, because it is a direct analogy from column
or row-based data storage in matrices, which is used in BLAS.

Best regards, Alexander

From: Burak Yavuz []
Sent: Tuesday, July 14, 2015 10:14 AM
To: Ulanov, Alexander
Cc: Rakesh Chalasani;
Subject: Re: BlockMatrix multiplication

Hi Alexander,

From your example code, using the GridPartitioner, you will have 1 column, and 5 rows. When
you perform an A^T^A multiplication, you will generate a separate GridPartitioner with 5 columns
and 5 rows. Therefore you are observing a huge shuffle. If you would generate a diagonal-block
matrix as an example (5x5), you should not observe any shuffle.

Basically, your example causes the worst kind of shuffle. We can implement RowBasedPartitioning,
and ColumnBasedPartitioning for optimization, but we didn't initially see it necessary to
expose the partitioners to users, and didn't add them (you can find the old implementations

Hope that helps!


On Tue, Jul 14, 2015 at 9:37 AM, Ulanov, Alexander <<>>
Hi Rakesh,

I am not interested in a particular case of A^T*A. This case is a handy setup so I don’t
need to create another matrix and force the blocks to co-locate. Basically, I am trying to
understand the effectiveness of BlockMatrix for multiplication of distributed matrices. It
seems that I am missing something or using it wrong.

Best regards, Alexander

From: Rakesh Chalasani [<>]
Sent: Tuesday, July 14, 2015 9:05 AM
To: Ulanov, Alexander
Subject: Re: BlockMatrix multiplication

Hi Alexander:

Aw, I missed the 'cogroup' on BlockMatrix multiply! I stand corrected. Check

BlockMatrix multiply uses a custom partitioner called GridPartitioner, that might be causing
the shuffle; which, in your special case need not happen. But, from what I understood from
your code, I don't think this is an issue since your special case can be handled using computeGramMatrix
on RowMatrix. Is there a reason you did not use that?


On Tue, Jul 14, 2015 at 11:03 AM Ulanov, Alexander <<>>
Hi Rakesh,

Thanks for suggestion. Each block of original matrix is in separate partition. Each block
of transposed matrix is also in a separate partition. The partition numbers are the same for
the blocks that undergo multiplication. Each partition is on a separate worker. Basically,
I want to force each worker to multiply only 2 blocks. This should be the optimal configuration
for multiplication, as far as I understand. Having several blocks in each partition as you
suggested is not optimal, is it?

Best regards, Alexander

Block matrix stores the data as key->Matrix pairs and multiply does a reduceByKey operations,
aggregating matrices per key. Since you said each block is residing in a separate partition,
reduceByKey might be effectively shuffling all of the data. A better way to go about this
is to allow multiple blocks within each partition so that reduceByKey does a local reduce
before aggregating across nodes.


On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander <<>>
Dear Spark developers,

I am trying to perform BlockMatrix multiplication in Spark. My test is as follows: 1)create
a matrix of N blocks, so that each row of block matrix contains only 1 block and each block
resides in separate partition on separate node, 2)transpose the block matrix and 3)multiply
the transposed matrix by the original non-transposed one. This should preserve the data locality,
so there should be no need for shuffle. However, I observe huge shuffle with the block matrix
size of 50000x10000 and one block 10000x10000, 5 blocks per matrix. Could you suggest what
is wrong?

My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16 GB RAM.
Below is the test code:

import org.apache.spark.mllib.linalg.Matrices
import org.apache.spark.mllib.linalg.distributed.BlockMatrix
val parallelism = 5
val blockSize = 10000
val rows = parallelism * blockSize
val columns = blockSize
val size = rows * columns
assert(rows % blockSize == 0)
assert(columns % blockSize == 0)
val rowBlocks = rows / blockSize
val columnBlocks = columns / blockSize
val rdd = sc.parallelize( {
                for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) yield (i, j)
                }, parallelism).map( coord => (coord, Matrices.rand(blockSize, blockSize,
val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
val mb = bm.transpose.cache()
val t = System.nanoTime()
val ata = mb.multiply(bm)
println(rows + "x" + columns + ", block:" + blockSize + "\t" + (System.nanoTime() - t) / 1e9)

Best regards, Alexander

View raw message