incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r751922 - in /incubator/hama/trunk: CHANGES.txt src/java/org/apache/hama/DenseMatrix.java src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java src/test/org/apache/hama/TestDenseMatrix.java
Date Tue, 10 Mar 2009 00:36:44 GMT
Author: edwardyoon
Date: Tue Mar 10 00:36:43 2009
New Revision: 751922

URL: http://svn.apache.org/viewvc?rev=751922&view=rev
Log:
Combine multi-mapreduce jobs into a single mapreduce job while computing such as 'C=alpha*B
+ A' 

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=751922&r1=751921&r2=751922&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Mar 10 00:36:43 2009
@@ -36,6 +36,8 @@
     
   IMPROVEMENTS
 
+    HAMA-154: Combine multi-mapreduce jobs into a single mapreduce job 
+                while computing (samuel via edwardyoon)
     HAMA-152: Replace MapWritable to hadoop.io.MapWritable (edwardyoon)
     HAMA-109: Implement add(double alpha, Matrix B) (edwardyoon)
     HAMA-150: Refactor blockingMapRed (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=751922&r1=751921&r2=751922&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Tue Mar 10 00:36:43 2009
@@ -96,7 +96,7 @@
     // if force is set to true:
     // 1) if this matrixName has aliase to other matrix, we will remove
     // the old aliase, create a new matrix table, and aliase to it.
-    
+
     // 2) if this matrixName has no aliase to other matrix, we will create
     // a new matrix table, and alise to it.
     //
@@ -306,13 +306,13 @@
    * @throws IOException
    */
   public double get(int i, int j) throws IOException {
-    if(this.getRows() < i || this.getColumns() < j)
-      throw new ArrayIndexOutOfBoundsException(i +", "+ j);
-    
+    if (this.getRows() < i || this.getColumns() < j)
+      throw new ArrayIndexOutOfBoundsException(i + ", " + j);
+
     Cell c = table.get(BytesUtil.getRowIndex(i), BytesUtil.getColumnIndex(j));
-    if(c == null)
+    if (c == null)
       throw new NullPointerException("Unexpected null");
-    
+
     return BytesUtil.bytesToDouble(c.getValue());
   }
 
@@ -326,7 +326,7 @@
   public DenseVector getRow(int i) throws IOException {
     return new DenseVector(table.getRow(BytesUtil.getRowIndex(i)));
   }
-  
+
   /**
    * Gets the vector of column
    * 
@@ -342,8 +342,8 @@
     MapWritable trunk = new MapWritable();
 
     for (RowResult row : scan) {
-      trunk.put(new IntWritable(BytesUtil.bytesToInt(row.getRow())), new DoubleEntry(row
-          .get(columnKey)));
+      trunk.put(new IntWritable(BytesUtil.bytesToInt(row.getRow())),
+          new DoubleEntry(row.get(columnKey)));
     }
 
     return new DenseVector(trunk);
@@ -355,7 +355,7 @@
     update.put(j, value);
     table.commit(update.getBatchUpdate());
   }
-  
+
   /**
    * Set the row of a matrix to a given vector
    * 
@@ -364,9 +364,9 @@
    * @throws IOException
    */
   public void setRow(int row, Vector vector) throws IOException {
-    if(this.getRows() < row)
+    if (this.getRows() < row)
       increaseRows();
-    
+
     VectorUpdate update = new VectorUpdate(row);
     update.putAll(((DenseVector) vector).getEntries());
     table.commit(update.getBatchUpdate());
@@ -382,7 +382,7 @@
   public void setColumn(int column, Vector vector) throws IOException {
     if (this.getColumns() < column)
       increaseColumns();
-      
+
     for (int i = 0; i < vector.size(); i++) {
       VectorUpdate update = new VectorUpdate(i);
       update.put(column, vector.get(i));
@@ -391,15 +391,16 @@
   }
 
   /**
-   * C = B + A
+   * C = alpha*B + A
    * 
+   * @param alpha
    * @param B
    * @return C
    * @throws IOException
    */
-  public DenseMatrix add(Matrix B) throws IOException {
+  public DenseMatrix add(double alpha, Matrix B) throws IOException {
     ensureForAddition(B);
-    
+
     DenseMatrix result = new DenseMatrix(config);
     JobConf jobConf = new JobConf(config);
     jobConf.setJobName("addition MR job" + result.getPath());
@@ -407,9 +408,9 @@
     jobConf.setNumMapTasks(config.getNumMapTasks());
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
 
-    RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(),
-        RowCyclicAdditionMap.class, IntWritable.class, MapWritable.class,
-        jobConf);
+    RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(), Double
+        .toString(alpha), RowCyclicAdditionMap.class, IntWritable.class,
+        MapWritable.class, jobConf);
     RowCyclicAdditionReduce.initJob(result.getPath(),
         RowCyclicAdditionReduce.class, jobConf);
 
@@ -418,28 +419,57 @@
   }
 
   /**
-   * C = alpha*B + A
+   * C = B + A
    * 
-   * @param alpha
    * @param B
    * @return C
    * @throws IOException
    */
-  public DenseMatrix add(double alpha, Matrix B) throws IOException {
-    ensureForAddition(B);
-    
-    DenseMatrix temp = new DenseMatrix(config);
-    temp.set(alpha, B);
-    DenseMatrix result = this.add(temp);
+  public DenseMatrix add(Matrix B) throws IOException {
+    return add(1.0, B);
+  }
+
+  public DenseMatrix add(Matrix... matrices) throws IOException {
+    // ensure all the matrices are suitable for addition.
+    for (Matrix m : matrices) {
+      ensureForAddition(m);
+    }
+
+    DenseMatrix result = new DenseMatrix(config);
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("addition MR job" + result.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+    StringBuilder summandList = new StringBuilder();
+    StringBuilder alphaList = new StringBuilder();
+    for (Matrix m : matrices) {
+      summandList.append(m.getPath());
+      summandList.append(",");
+      alphaList.append("1");
+      alphaList.append(",");
+    }
+    summandList.deleteCharAt(summandList.length() - 1);
+    alphaList.deleteCharAt(alphaList.length() - 1);
+
+    RowCyclicAdditionMap.initJob(this.getPath(), summandList.toString(),
+        alphaList.toString(), RowCyclicAdditionMap.class, IntWritable.class,
+        MapWritable.class, jobConf);
+    RowCyclicAdditionReduce.initJob(result.getPath(),
+        RowCyclicAdditionReduce.class, jobConf);
+
+    JobManager.execute(jobConf, result);
     return result;
   }
-  
+
   private void ensureForAddition(Matrix m) throws IOException {
-    if(getRows()!= m.getRows() || getColumns() != m.getColumns()) {
-      throw new IOException("Matrices' rows and columns should be same while A+B.");
+    if (getRows() != m.getRows() || getColumns() != m.getColumns()) {
+      throw new IOException(
+          "Matrices' rows and columns should be same while A+B.");
     }
   }
-  
+
   /**
    * C = A*B using SIMD algorithm
    * 
@@ -449,7 +479,7 @@
    */
   public DenseMatrix mult(Matrix B) throws IOException {
     ensureForMultiplication(B);
-    
+
     DenseMatrix result = new DenseMatrix(config);
 
     JobConf jobConf = new JobConf(config);
@@ -458,8 +488,8 @@
     jobConf.setNumMapTasks(config.getNumMapTasks());
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
 
-    SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), this.getType(), SIMDMultiplyMap.class,
-        IntWritable.class, MapWritable.class, jobConf);
+    SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), this.getType(),
+        SIMDMultiplyMap.class, IntWritable.class, MapWritable.class, jobConf);
     SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
         jobConf);
     JobManager.execute(jobConf, result);
@@ -476,13 +506,13 @@
    */
   public DenseMatrix mult(Matrix B, int blocks) throws IOException {
     ensureForMultiplication(B);
-    
+
     DenseMatrix collectionTable = new DenseMatrix(config);
     LOG.info("Collect Blocks");
 
     collectBlocksMapRed(this.getPath(), collectionTable, blocks, true);
     collectBlocksMapRed(B.getPath(), collectionTable, blocks, false);
-    
+
     DenseMatrix result = new DenseMatrix(config);
 
     JobConf jobConf = new JobConf(config);
@@ -500,9 +530,9 @@
     // Should be collectionTable removed?
     return result;
   }
-  
+
   private void ensureForMultiplication(Matrix m) throws IOException {
-    if(getColumns() != m.getRows()) {
+    if (getColumns() != m.getRows()) {
       throw new IOException("A's columns should equal with B's rows while A*B.");
     }
   }
@@ -541,8 +571,8 @@
   }
 
   /**
-   * Returns the sub matrix formed by selecting certain rows and
-   * columns from a bigger matrix. The sub matrix is a in-memory operation only.
+   * Returns the sub matrix formed by selecting certain rows and columns from a
+   * bigger matrix. The sub matrix is a in-memory operation only.
    * 
    * @param i0 the start index of row
    * @param i1 the end index of row

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java?rev=751922&r1=751921&r2=751922&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java Tue Mar
10 00:36:43 2009
@@ -39,25 +39,38 @@
 public class RowCyclicAdditionMap extends MapReduceBase implements
 Mapper<IntWritable, MapWritable, IntWritable, MapWritable> {
   static final Logger LOG = Logger.getLogger(RowCyclicAdditionMap.class);
-  protected DenseMatrix matrix_b;
-  public static final String MATRIX_B = "hama.addition.matrix.b";
+  protected DenseMatrix[] matrix_summands;
+  protected double[] matrix_alphas;
+  public static final String MATRIX_SUMMANDS = "hama.addition.summands";
+  public static final String MATRIX_ALPHAS = "hama.addition.alphas";
 
   public void configure(JobConf job) {
     try {
-      matrix_b = new DenseMatrix(new HamaConfiguration(job), job.get(MATRIX_B, ""));
+      String[] matrix_names = job.get(MATRIX_SUMMANDS, "").split(","); 
+      String[] matrix_alpha_strs = job.get(MATRIX_ALPHAS, "").split(",");
+      assert(matrix_names.length == matrix_alpha_strs.length && matrix_names.length
>= 1);
+      
+      matrix_summands = new DenseMatrix[matrix_names.length];
+      matrix_alphas = new double[matrix_names.length];
+      for(int i=0; i<matrix_names.length; i++) {
+        matrix_summands[i] = new DenseMatrix(new HamaConfiguration(job), matrix_names[i]);
+        matrix_alphas[i] = Double.valueOf(matrix_alpha_strs[i]);
+      }
     } catch (IOException e) {
       LOG.warn("Load matrix_b failed : " + e.getMessage());
     }
   }
 
-  public static void initJob(String matrix_a, String matrix_b,
-      Class<RowCyclicAdditionMap> map, Class<IntWritable> outputKeyClass,
-      Class<MapWritable> outputValueClass, JobConf jobConf) {
+  public static void initJob(String matrix_a, String matrix_summandlist, 
+      String matrix_alphalist, Class<RowCyclicAdditionMap> map, 
+      Class<IntWritable> outputKeyClass, Class<MapWritable> outputValueClass,

+      JobConf jobConf) {
 
     jobConf.setMapOutputValueClass(outputValueClass);
     jobConf.setMapOutputKeyClass(outputKeyClass);
     jobConf.setMapperClass(map);
-    jobConf.set(MATRIX_B, matrix_b);
+    jobConf.set(MATRIX_SUMMANDS, matrix_summandlist);
+    jobConf.set(MATRIX_ALPHAS, matrix_alphalist);
 
     jobConf.setInputFormat(VectorInputFormat.class);
     FileInputFormat.addInputPaths(jobConf, matrix_a);
@@ -69,10 +82,13 @@
       OutputCollector<IntWritable, MapWritable> output, Reporter reporter)
       throws IOException {
     
-    DenseVector a = matrix_b.getRow(key.get());
-    DenseVector b = new DenseVector(value);
-    DenseVector c = a.add(b);
-    output.collect(key, c.getEntries());
+    DenseVector result = new DenseVector(value);
+    DenseVector summand;
+    for(int i=0; i<matrix_summands.length; i++) {
+      summand = matrix_summands[i].getRow(key.get());
+      result = result.add(matrix_alphas[i], summand);
+    }
+    output.collect(key, result.getEntries());
 
   }
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=751922&r1=751921&r2=751922&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Tue Mar 10 00:36:43
2009
@@ -41,6 +41,7 @@
   private static int SIZE = 10;
   private static Matrix m1;
   private static Matrix m2;
+  private static Matrix m3;
   private final static String aliase1 = "matrix_aliase_A";
   private final static String aliase2 = "matrix_aliase_B";
   private static HamaConfiguration conf;
@@ -59,6 +60,7 @@
 
         m1 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
         m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
+        m3 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
       }
 
       protected void tearDown() {
@@ -165,6 +167,19 @@
       }
     }
   }
+  
+  public void testMultiMatrixAdd() throws IOException {
+    Matrix result = ((DenseMatrix)m1).add(m2, m3);
+    
+    assertEquals(result.getRows(), SIZE);
+    assertEquals(result.getColumns(), SIZE);
+    
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        assertEquals(result.get(i, j), m1.get(i, j) + m2.get(i, j) + m3.get(i, j));
+      }
+    }
+  }
 
   /**
    * Test matrices multiplication



Mime
View raw message