incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hama Wiki] Update of "MatMult" by udanax
Date Sat, 08 May 2010 02:07:53 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.

The "MatMult" page has been changed by udanax.
http://wiki.apache.org/hama/MatMult?action=diff&rev1=14&rev2=15

--------------------------------------------------

- == Multiplication of matrices ==
+ == Multiplication of two matrices with Hama ==
+ 
+ In examples package of Hama, we have introduced two MapReduce-based approaches and illustrated
them on examples.
+ 
+ === Iterative Approach ===
+ 
+ The iterative approach is simple and naive. Initially, each map task receives a row index
of B as a key, and the column vector of the row as a value. Then, it multiplies all columns
of i-th row of A with the received column vector. Finally, a reduce task collects the i-th
product into the result matrix.
  
  {{{
+ For i = 0 step 1 until N -1
+   Job: Computes the ith row of C = Matrix-Vector multiplication
-   public static void main() throws IOException {
-     HamaConfiguration conf = new HamaConfiguration();
-     Matrix m1 = DenseMatrix.random(conf, 1000, 1000);
-     Matrix m2 = DenseMatrix.random(conf, 1000, 1000);
- 
-     Matrix result = m1.mult(m2);
-     // or multiplication using blocking algorithm
-     Matrix result = m1.mult(m2, 100);
-  
-   }
  }}}
  
- == Multiplication of file matrices ==
+ === Block Approach ===
+ 
+ To multiply two dense matrices A and B, we should build the "collectionTable" in the pre-processing
phase of !MapReduce. The collectionTable is an 1-D representation, transformed from the original
2-D representation of two matrices. Each row of the collectionTable has two submatrices of
A(i,k) and B(k,j) with the row index of (n2 ∗ i) + (n ∗ j) + k, where n denotes the row
size of matrix A and B. We call these submatrices a block. Each map task walks only on the
collectionTable instead of the original matrices, and thus it significantly reduces required
data movement over the network. The following code shows the block algorithm after pre-processing.
Each map task receives a blockID as a key, and two submatrices of A and B as its value, and
then multiplies two submatrices, A[i][j] ∗ B[j][k]. Afterward, a reduce task computes the
summation of blocks, s[i][k]+ = multipliedblocks. The pseudo-code of the block approach is
depicted in Algorithm 2.
  
  {{{
+ Blocking jobs:
-  public static void main(String[] args) throws IOException {
-     collectBlocksFromFile(path[0], true, collectionTable, conf); // path of matrix A
-     collectBlocksFromFile(path[1], false, collectionTable, conf); // path of matrix B 
  
+ Collect the blocks to 'collectionTable' from A and B.
-     Matrix result = new DenseMatrix(conf, 4, 4);
-     Job job = new Job(conf, "multiplication MR job : " + result.getPath());
  
-     Scan scan = new Scan();
-     scan.addFamily(Bytes.toBytes(Constants.BLOCK));
+ - A map task receives a row n as a key, and vector of each row as its value
+    - emit (blockID, sub-vector) pairs
+ - Reduce task merges block structures based on the information of blockID
  
+ Multiplication job:
-     TableMapReduceUtil.initTableMapperJob(collectionTable, scan,
-         BlockMultMap.class, BlockID.class, BytesWritable.class, job);
-     TableMapReduceUtil.initTableReducerJob(result.getPath(),
-         BlockMultReduce.class, job);
  
+ - A map task receives a blockID n as a key, and two sub-matrices of A and B as its value
+    - Multiply two sub-matrices: A[i][j] * B[j][k]
+ - Reduce task computes sum of blocks 
+    - s[i][k] += multiplied blocks
-     try {
-       job.waitForCompletion(true);
-     } catch (InterruptedException e) {
-       e.printStackTrace();
-     } catch (ClassNotFoundException e) {
-       e.printStackTrace();
-     }
-   }
- 
-   private static void collectBlocksFromFile(Path path, boolean b,
-       String collectionTable, HamaConfiguration conf) throws IOException {
-     Job job = new Job(conf, "Blocking MR job" + path);
-     job.setMapperClass(MyMapper.class);
-     job.setMapOutputKeyClass(BlockID.class);
-     job.setMapOutputValueClass(MapWritable.class);
-     job.setInputFormatClass(SequenceFileInputFormat.class);
-     SequenceFileInputFormat.addInputPath(job, path);
- 
-     job.getConfiguration().set(MyMapper.BLOCK_SIZE, String.valueOf(2));
-     job.getConfiguration().set(MyMapper.ROWS, String.valueOf(4));
-     job.getConfiguration().set(MyMapper.COLUMNS, String.valueOf(4));
-     job.getConfiguration().setBoolean(MyMapper.MATRIX_POS, b);
-     
-     TableMapReduceUtil.initTableReducerJob(collectionTable,
-         org.apache.hama.mapreduce.CollectBlocksReducer.class, job);
- 
-     try {
-       job.waitForCompletion(true);
-     } catch (InterruptedException e) {
-       e.printStackTrace();
-     } catch (ClassNotFoundException e) {
-       e.printStackTrace();
-     }
-   }
- 
-   public static class MyMapper extends
-       Mapper<IntWritable, MapWritable, BlockID, MapWritable> implements
-       Configurable {
-     private Configuration conf = null;
-     /** Parameter of the path of the matrix to be blocked * */
-     public static final String BLOCK_SIZE = "hama.blocking.size";
-     public static final String ROWS = "hama.blocking.rows";
-     public static final String COLUMNS = "hama.blocking.columns";
-     public static final String MATRIX_POS = "a.or.b";
- 
-     private int mBlockNum;
-     private int mBlockRowSize;
-     private int mBlockColSize;
-     private int mRows;
-     private int mColumns;
- 
-     public void map(IntWritable key, MapWritable value, Context context)
-         throws IOException, InterruptedException {
-       int startColumn, endColumn, blkRow = key.get() / mBlockRowSize, i = 0;
-       DenseVector dv = new DenseVector(key.get(), value);
- 
-       do {
-         startColumn = i * mBlockColSize;
-         endColumn = startColumn + mBlockColSize - 1;
-         if (endColumn >= mColumns) // the last sub vector
-           endColumn = mColumns - 1;
-         context.write(new BlockID(blkRow, i), dv.subVector(startColumn,
-             endColumn).getEntries());
- 
-         i++;
-       } while (endColumn < (mColumns - 1));
-     }
- 
-     @Override
-     public Configuration getConf() {
-       return conf;
-     }
- 
-     @Override
-     public void setConf(Configuration conf) {
-       this.conf = conf;
- 
-       mBlockNum = conf.getInt(BLOCK_SIZE, 2);
-       mRows = conf.getInt(ROWS, 4);
-       mColumns = conf.getInt(COLUMNS, 4);
- 
-       mBlockRowSize = mRows / mBlockNum;
-       mBlockColSize = mColumns / mBlockNum;
-     }
-   }
  }}}
  

Mime
View raw message