incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r743189 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/mapred/
Date Wed, 11 Feb 2009 01:10:22 GMT
Author: edwardyoon
Date: Wed Feb 11 01:10:22 2009
New Revision: 743189

URL: http://svn.apache.org/viewvc?rev=743189&view=rev
Log:
Refactor blockingMapRed

Added:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=743189&r1=743188&r2=743189&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Feb 11 01:10:22 2009
@@ -34,6 +34,7 @@
     
   IMPROVEMENTS
     
+    HAMA-150: Refactor blockingMapRed (edwardyoon)
     HAMA-148: Implement of set(double alpha, Matrix B) (edwardyoon)
     HAMA-100: Implement of set(Matrix B) (edwardyoon)
     HAMA-144: GetProgress during MR over a matrix (samuel)

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=743189&r1=743188&r2=743189&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Wed Feb 11 01:10:22 2009
@@ -49,9 +49,10 @@
 import org.apache.hama.io.MapWritable;
 import org.apache.hama.io.VectorUpdate;
 import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.BlockingMapRed;
+import org.apache.hama.mapred.CollectBlocksMapper;
 import org.apache.hama.mapred.RandomMatrixMap;
 import org.apache.hama.mapred.RandomMatrixReduce;
+import org.apache.hama.mapred.VectorInputFormat;
 import org.apache.hama.util.BytesUtil;
 import org.apache.hama.util.JobManager;
 import org.apache.hama.util.RandomVariable;
@@ -250,7 +251,7 @@
     LOG.info("Create the " + m + " * " + n + " random matrix : "
         + rand.getPath());
     rand.setDimension(m, n);
-    
+
     JobConf jobConf = new JobConf(conf);
     jobConf.setJobName("random matrix MR job : " + rand.getPath());
 
@@ -262,9 +263,9 @@
     jobConf.setMapperClass(RandomMatrixMap.class);
     jobConf.setMapOutputKeyClass(IntWritable.class);
     jobConf.setMapOutputValueClass(VectorWritable.class);
-    
+
     RandomMatrixReduce.initJob(rand.getPath(), RandomMatrixReduce.class,
-            jobConf);
+        jobConf);
     jobConf.setSpeculativeExecution(false);
     jobConf.set("matrix.column", String.valueOf(n));
 
@@ -381,17 +382,16 @@
   }
 
   public Matrix mult(Matrix B) throws IOException {
-  Matrix result = new DenseMatrix(config);
-    
+    Matrix result = new DenseMatrix(config);
+
     JobConf jobConf = new JobConf(config);
     jobConf.setJobName("multiplication MR job : " + result.getPath());
 
     jobConf.setNumMapTasks(config.getNumMapTasks());
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
-    
-    SIMDMultiplyMap.initJob(this.getPath(), B.getPath(),
-        SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class,
-        jobConf);
+
+    SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), SIMDMultiplyMap.class,
+        IntWritable.class, VectorWritable.class, jobConf);
     SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
         jobConf);
     JobManager.execute(jobConf, result);
@@ -409,28 +409,32 @@
   public Matrix mult(Matrix B, int blocks) throws IOException {
     Matrix collectionTable = new DenseMatrix(config);
     LOG.info("Collect Blocks");
-    collectBlocks(this, collectionTable, blocks, true);
-    collectBlocks(B, collectionTable, blocks, false);
+    collectBlocks(this, B, blocks, collectionTable);
 
     Matrix result = new DenseMatrix(config);
-    
+
     JobConf jobConf = new JobConf(config);
     jobConf.setJobName("multiplication MR job : " + result.getPath());
 
     jobConf.setNumMapTasks(config.getNumMapTasks());
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
-    
-    BlockMultiplyMap.initJob(collectionTable.getPath(), 
-        BlockMultiplyMap.class, BlockID.class, BlockWritable.class,
+
+    BlockMultiplyMap.initJob(collectionTable.getPath(), BlockMultiplyMap.class,
+        BlockID.class, BlockWritable.class, jobConf);
+    BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class,
         jobConf);
-    BlockMultiplyReduce.initJob(result.getPath(),
-        BlockMultiplyReduce.class, jobConf);
 
     JobManager.execute(jobConf, result);
     // Should be collectionTable removed?
     return result;
   }
 
+  private void collectBlocks(Matrix a, Matrix b, int blocks,
+      Matrix collectionTable) throws IOException {
+    collectBlocksMapRed(a.getPath(), collectionTable, blocks, true);
+    collectBlocksMapRed(b.getPath(), collectionTable, blocks, false);
+  }
+
   public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException {
     // TODO Auto-generated method stub
     return null;
@@ -471,8 +475,8 @@
       cols[jj] = BytesUtil.getColumnIndex(j);
     }
 
-    Scanner scan = table.getScanner(cols, BytesUtil.getRowIndex(i0), 
-        BytesUtil.getRowIndex(i1 + 1));
+    Scanner scan = table.getScanner(cols, BytesUtil.getRowIndex(i0), BytesUtil
+        .getRowIndex(i1 + 1));
     Iterator<RowResult> it = scan.iterator();
     int i = 0;
     RowResult rs = null;
@@ -491,13 +495,13 @@
   /**
    * Collect Blocks
    * 
-   * @param resource
+   * @param path
    * @param collectionTable
    * @param blockNum
    * @param bool
    * @throws IOException
    */
-  public void collectBlocks(Matrix resource, Matrix collectionTable, 
+  public void collectBlocksMapRed(String path, Matrix collectionTable,
       int blockNum, boolean bool) throws IOException {
     double blocks = Math.pow(blockNum, 0.5);
     if (!String.valueOf(blocks).endsWith(".0"))
@@ -505,15 +509,21 @@
 
     int block_size = (int) blocks;
     collectionTable.setDimension(block_size, block_size);
-    
+
     JobConf jobConf = new JobConf(config);
     jobConf.setJobName("Blocking MR job" + getPath());
 
     jobConf.setNumMapTasks(config.getNumMapTasks());
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
+    jobConf.setMapperClass(CollectBlocksMapper.class);
+    jobConf.setInputFormat(VectorInputFormat.class);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+
+    FileInputFormat.addInputPaths(jobConf, path);
+
+    CollectBlocksMapper.initJob(collectionTable.getPath(), bool, block_size,
+        this.getRows(), this.getColumns(), jobConf);
 
-    BlockingMapRed.initJob(resource.getPath(), collectionTable.getPath(), 
-        bool, block_size, this.getRows(), this.getColumns(), jobConf);
     JobManager.execute(jobConf);
   }
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java?rev=743189&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java Wed Feb 11
01:10:22 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.mapred;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorWritable;
+
+/**
+ * Abstract Blocking Map/Reduce Class to configure the job.
+ */
+public abstract class CollectBlocksBase extends MapReduceBase {
+  /** Parameter of the path of the matrix to be blocked * */
+  public static final String BLOCK_SIZE = "hama.blocking.size";
+  public static final String ROWS = "hama.blocking.rows";
+  public static final String COLUMNS = "hama.blocking.columns";
+  public static final String MATRIX_POS = "a.ore.b";
+
+  protected int mBlockNum;
+  protected int mBlockRowSize;
+  protected int mBlockColSize;
+  protected int mRows;
+  protected int mColumns;
+  protected boolean matrixPos;
+
+  @Override
+  public void configure(JobConf job) {
+    mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, ""));
+    mRows = Integer.parseInt(job.get(ROWS, ""));
+    mColumns = Integer.parseInt(job.get(COLUMNS, ""));
+
+    mBlockRowSize = mRows / mBlockNum;
+    mBlockColSize = mColumns / mBlockNum;
+
+    matrixPos = job.getBoolean(MATRIX_POS, true);
+  }
+
+  /**
+   * Initialize a job to blocking a table
+   */
+  @SuppressWarnings("unchecked")
+  public static void initJob(String collectionTable, boolean bool,
+      int block_size, int i, int j, JobConf job) {
+    job.setReducerClass(CollectBlocksReducer.class);
+    job.setMapOutputKeyClass(BlockID.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputFormat(BlockOutputFormat.class);
+    job.setOutputKeyClass(BlockID.class);
+    job.setOutputValueClass(BlockWritable.class);
+    job.set(BLOCK_SIZE, String.valueOf(block_size));
+    job.set(ROWS, String.valueOf(i));
+    job.set(COLUMNS, String.valueOf(j));
+    job.setBoolean(MATRIX_POS, bool);
+    job.set(BlockOutputFormat.OUTPUT_TABLE, collectionTable);
+
+    if (bool)
+      job.set(BlockOutputFormat.COLUMN, "a");
+    else
+      job.set(BlockOutputFormat.COLUMN, "b");
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java?rev=743189&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java Wed Feb 11
01:10:22 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.mapred;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.VectorWritable;
+
+@SuppressWarnings("unchecked")
+public interface CollectBlocksMap<K extends WritableComparable, V extends Writable>
+    extends Mapper<K, V, BlockID, VectorWritable> {
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java?rev=743189&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java Wed Feb
11 01:10:22 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.DenseVector;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.VectorWritable;
+
+/**
+ * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
+ */
+public class CollectBlocksMapper extends CollectBlocksBase implements
+    CollectBlocksMap<IntWritable, VectorWritable> {
+
+  @Override
+  public void map(IntWritable key, VectorWritable value,
+      OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
+      throws IOException {
+    int startColumn;
+    int endColumn;
+    int blkRow = key.get() / mBlockRowSize;
+    DenseVector dv = value.getDenseVector();
+
+    int i = 0;
+    do {
+      startColumn = i * mBlockColSize;
+      endColumn = startColumn + mBlockColSize - 1;
+      if (endColumn >= mColumns) // the last sub vector
+        endColumn = mColumns - 1;
+      output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(), dv
+          .subVector(startColumn, endColumn)));
+
+      i++;
+    } while (endColumn < (mColumns - 1));
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java?rev=743189&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java Wed Feb
11 01:10:22 2009
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorWritable;
+
+/**
+ * Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k)
+ * to avoid duplicated records. Each row has a two sub matrices of a(i, k) and
+ * b(k, j).
+ */
+public class CollectBlocksReducer extends CollectBlocksBase implements
+    Reducer<BlockID, VectorWritable, BlockID, BlockWritable> {
+
+  @Override
+  public void reduce(BlockID key, Iterator<VectorWritable> values,
+      OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
+      throws IOException {
+    // Note: all the sub-vectors are grouped by {@link
+    // org.apache.hama.io.BlockID}
+
+    // the block's base offset in the original matrix
+    int colBase = key.getColumn() * mBlockColSize;
+    int rowBase = key.getRow() * mBlockRowSize;
+
+    // the block's size : rows & columns
+    int smRows = mBlockRowSize;
+    if ((rowBase + mBlockRowSize - 1) >= mRows)
+      smRows = mRows - rowBase;
+    int smCols = mBlockColSize;
+    if ((colBase + mBlockColSize - 1) >= mColumns)
+      smCols = mColumns - colBase;
+
+    // construct the matrix
+    SubMatrix subMatrix = new SubMatrix(smRows, smCols);
+
+    // i, j is the current offset in the sub-matrix
+    int i = 0, j = 0;
+    while (values.hasNext()) {
+      VectorWritable vw = values.next();
+      // check the size is suitable
+      if (vw.size() != smCols)
+        throw new IOException("Block Column Size dismatched.");
+      i = vw.row - rowBase;
+      if (i >= smRows || i < 0)
+        throw new IOException("Block Row Size dismatched.");
+
+      // put the subVector to the subMatrix
+      for (j = 0; j < smCols; j++) {
+        subMatrix.set(i, j, vw.get(colBase + j));
+      }
+    }
+    BlockWritable outValue = new BlockWritable(subMatrix);
+
+    // It'll used for only matrix multiplication.
+    if (matrixPos) {
+      for (int x = 0; x < mBlockNum; x++) {
+        int r = (key.getRow() * mBlockNum) * mBlockNum;
+        int seq = (x * mBlockNum) + key.getColumn() + r;
+        output.collect(new BlockID(key.getRow(), x, seq), outValue);
+      }
+    } else {
+      for (int x = 0; x < mBlockNum; x++) {
+        int seq = (x * mBlockNum * mBlockNum) + (key.getColumn() * mBlockNum)
+            + key.getRow();
+        output.collect(new BlockID(x, key.getColumn(), seq), outValue);
+      }
+    }
+  }
+}



Mime
View raw message