incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r766595 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/ src/java/org/apache/hama/algebra/ src/java/org/apache/hama/mapred/ src/java/org/apache/hama/util/ src/test/org/apache/hama/ src/test/org...
Date Mon, 20 Apr 2009 06:56:06 GMT
Author: edwardyoon
Date: Mon Apr 20 06:56:05 2009
New Revision: 766595

URL: http://svn.apache.org/viewvc?rev=766595&view=rev
Log:
Implementation of random sparse matrix using mapreduce

Added:
    incubator/hama/trunk/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/SparseMatrixVectorMultMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/SparseMatrixVectorMultReduce.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/RandomMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
    incubator/hama/trunk/src/java/org/apache/hama/HamaAdmin.java
    incubator/hama/trunk/src/java/org/apache/hama/HamaAdminImpl.java
    incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/SparseVector.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java
    incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java
    incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Apr 20 06:56:05 2009
@@ -33,9 +33,10 @@
     HAMA-7: Add some information for a new comer (edwardyoon)
     HAMA-1: Create the Hama web site (edwardyoon via Ian Holsman)
     HAMA-2: The intial donation of Hama from the google project (edwardyoon)
-    
+
   IMPROVEMENTS
 
+    HAMA-158: Implementation of random sparse matrix (edwardyoon)
     HAMA-164: Example for C = Alpha*B + A (edwardyoon)
     HAMA-154: Combine multi-mapreduce jobs into a single mapreduce job 
                 while computing (samuel via edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java Mon Apr 20 06:56:05 2009
@@ -22,7 +22,10 @@
 import java.io.IOException;
 
 import org.apache.hama.DenseMatrix;
+import org.apache.hama.HamaAdmin;
+import org.apache.hama.HamaAdminImpl;
 import org.apache.hama.Matrix;
+import org.apache.hama.SparseMatrix;
 
 public class MatrixMultiplication extends AbstractExample {
   public static void main(String[] args) throws IOException {
@@ -37,14 +40,24 @@
     String matrixA = ARGS.get(0);
     String matrixB = ARGS.get(1);
 
-    DenseMatrix a = new DenseMatrix(conf, matrixA, false);
-    DenseMatrix b = new DenseMatrix(conf, matrixB, false);
+    HamaAdmin admin = new HamaAdminImpl(conf);
+    Matrix a = admin.getMatrix(matrixA);
+    Matrix b = admin.getMatrix(matrixB);
+
+    if (!a.getType().equals(b.getType())) {
+      System.out.println(a.getType() + " != " + b.getType());
+      System.exit(-1);
+    }
+
     Matrix c;
-    
-    if (ARGS.size() > 2) {
-      c = a.mult(b, Integer.parseInt(ARGS.get(2)));
+    if (a.getType().equals("SparseMatrix")) {
+      c = ((SparseMatrix) a).mult(b);
     } else {
-      c = a.mult(b);
+      if (ARGS.size() > 2) {
+        c = ((DenseMatrix) a).mult(b, Integer.parseInt(ARGS.get(2)));
+      } else {
+        c = ((DenseMatrix) a).mult(b);
+      }
     }
 
     for (int i = 0; i < 2; i++) {

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/RandomMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/RandomMatrix.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/RandomMatrix.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/RandomMatrix.java Mon Apr 20 06:56:05 2009
@@ -22,13 +22,15 @@
 import java.io.IOException;
 
 import org.apache.hama.DenseMatrix;
+import org.apache.hama.Matrix;
+import org.apache.hama.SparseMatrix;
 
 public class RandomMatrix extends AbstractExample {
 
   public static void main(String[] args) throws IOException {
     if (args.length < 3) {
       System.out
-          .println("rand [-m maps] [-r reduces] <rows> <columns> <matrix_name>");
+          .println("rand [-m maps] [-r reduces] <rows> <columns> <sparse | dense> <matrix_name>");
       System.exit(-1);
     } else {
       parseArgs(args);
@@ -37,7 +39,12 @@
     int row = Integer.parseInt(ARGS.get(0));
     int column = Integer.parseInt(ARGS.get(1));
 
-    DenseMatrix a = DenseMatrix.random_mapred(conf, row, column);
-    a.save(ARGS.get(2));
+    Matrix a;
+    if(ARGS.get(2).equals("sparse"))
+      a = SparseMatrix.random_mapred(conf, row, column);
+    else
+      a = DenseMatrix.random_mapred(conf, row, column);
+    
+    a.save(ARGS.get(3));
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Mon Apr 20 06:56:05 2009
@@ -376,6 +376,7 @@
     // ! one matrix has only one aliasename now.
     BatchUpdate update = new BatchUpdate(Constants.METADATA);
     update.put(Constants.ALIASENAME, Bytes.toBytes(aliasename));
+    update.put(Constants.ATTRIBUTE + "type", Bytes.toBytes(this.getType()));
     table.commit(update);
 
     return hamaAdmin.save(this, aliasename);

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java Mon Apr 20 06:56:05 2009
@@ -30,11 +30,13 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.io.DoubleEntry;
 import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
 
 /**
  * Methods of the vector classes
  */
 public abstract class AbstractVector {
+  static final Logger LOG = Logger.getLogger(AbstractVector.class);
   protected MapWritable entries;
 
   public void initMap(RowResult row) {

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Mon Apr 20 06:56:05 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.hbase.client.Scanner;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -41,10 +42,10 @@
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hama.algebra.BlockMultiplyMap;
 import org.apache.hama.algebra.BlockMultiplyReduce;
+import org.apache.hama.algebra.DenseMatrixVectorMultMap;
+import org.apache.hama.algebra.DenseMatrixVectorMultReduce;
 import org.apache.hama.algebra.RowCyclicAdditionMap;
 import org.apache.hama.algebra.RowCyclicAdditionReduce;
-import org.apache.hama.algebra.SIMDMultiplyMap;
-import org.apache.hama.algebra.SIMDMultiplyReduce;
 import org.apache.hama.io.BlockID;
 import org.apache.hama.io.BlockWritable;
 import org.apache.hama.io.DoubleEntry;
@@ -242,7 +243,8 @@
         jobConf);
     jobConf.setSpeculativeExecution(false);
     jobConf.set("matrix.column", String.valueOf(n));
-
+    jobConf.set("matrix.type", TABLE_PREFIX);
+    
     jobConf.setInputFormat(SequenceFileInputFormat.class);
     final FileSystem fs = FileSystem.get(jobConf);
     int interval = m / conf.getNumMapTasks();
@@ -326,7 +328,7 @@
    * @throws IOException
    */
   public DenseVector getRow(int i) throws IOException {
-    return new DenseVector(table.getRow(BytesUtil.getRowIndex(i)));
+    return new DenseVector(table.getRow(BytesUtil.getRowIndex(i), new byte[][] { Bytes.toBytes(Constants.COLUMN) }));
   }
 
   /**
@@ -475,7 +477,7 @@
   }
 
   /**
-   * C = A*B using SIMD algorithm
+   * C = A*B using iterative method
    * 
    * @param B
    * @return C
@@ -483,20 +485,23 @@
    */
   public DenseMatrix mult(Matrix B) throws IOException {
     ensureForMultiplication(B);
-
     DenseMatrix result = new DenseMatrix(config);
+    
+    for(int i = 0; i < this.getRows(); i++) {
+      JobConf jobConf = new JobConf(config);
+      jobConf.setJobName("multiplication MR job : " + result.getPath() + " " + i);
+
+      jobConf.setNumMapTasks(config.getNumMapTasks());
+      jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+      DenseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(), DenseMatrixVectorMultMap.class,
+          IntWritable.class, MapWritable.class, jobConf);
+      DenseMatrixVectorMultReduce.initJob(result.getPath(), DenseMatrixVectorMultReduce.class,
+          jobConf);
+      JobManager.execute(jobConf);
+    }
 
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("multiplication MR job : " + result.getPath());
-
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(config.getNumReduceTasks());
-
-    SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), this.getType(),
-        SIMDMultiplyMap.class, IntWritable.class, MapWritable.class, jobConf);
-    SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
-        jobConf);
-    JobManager.execute(jobConf, result);
+    result.setDimension(this.getRows(), this.getColumns());
     return result;
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java Mon Apr 20 06:56:05 2009
@@ -302,4 +302,10 @@
 
     return res;
   }
+
+  public void zeroFill(int size) {
+    for(int i = 0; i < size; i++) {
+      this.set(i, 0);
+    }
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/HamaAdmin.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/HamaAdmin.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/HamaAdmin.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/HamaAdmin.java Mon Apr 20 06:56:05 2009
@@ -1,59 +1,68 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.IOException;
-
-/**
- * A administration interface to manage the matrix's namespace, and table
- * allocation & garbage collection.
- */
-public interface HamaAdmin {
-
-  /**
-   * Saves matrix as name 'AliaseName'
-   * 
-   * @param matrix
-   * @param aliaseName
-   * @return true if it saved
-   */
-  public boolean save(Matrix matrix, String aliaseName);
-
-  /**
-   * @param name
-   * @return return a physical path of matrix
-   */
-  public String getPath(String name);
-
-  /**
-   * @param matrixName
-   * @return true if matrix is exist
-   */
-  public boolean matrixExists(String matrixName);
-
-  /**
-   * Deletes matrix
-   * 
-   * @param matrixName
-   * @throws IOException
-   */
-  public void delete(String matrixName) throws IOException;
-
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+/**
+ * A administration interface to manage the matrix's namespace, and table
+ * allocation & garbage collection.
+ */
+public interface HamaAdmin {
+
+  /**
+   * Saves matrix as name 'AliaseName'
+   * 
+   * @param matrix
+   * @param aliaseName
+   * @return true if it saved
+   */
+  public boolean save(Matrix matrix, String aliaseName);
+
+  /**
+   * @param name
+   * @return return a physical path of matrix
+   */
+  public String getPath(String name);
+
+  /**
+   * @param matrixName
+   * @return true if matrix is exist
+   */
+  public boolean matrixExists(String matrixName);
+
+  /**
+   * Deletes matrix
+   * 
+   * @param matrixName
+   * @throws IOException
+   */
+  public void delete(String matrixName) throws IOException;
+
+  /**
+   * Load matrix
+   * 
+   * @param matrixName
+   * @return the matrix
+   * @throws IOException 
+   */
+  public Matrix getMatrix(String matrixName) throws IOException;
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/HamaAdminImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/HamaAdminImpl.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/HamaAdminImpl.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/HamaAdminImpl.java Mon Apr 20 06:56:05 2009
@@ -1,193 +1,214 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.RegionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-
-/**
- * An Implementation of {@link org.apache.hama.HamaAdmin} to manage the matrix's
- * namespace, and table allocation & garbage collection.
- */
-public class HamaAdminImpl implements HamaAdmin {
-  static final Logger LOG = Logger.getLogger(HamaAdminImpl.class);
-  protected HamaConfiguration conf;
-  protected HBaseAdmin admin;
-  protected HTable table;
-
-  /**
-   * Constructor
-   * 
-   * @param conf
-   * @throws MasterNotRunningException
-   */
-  public HamaAdminImpl(HamaConfiguration conf) throws MasterNotRunningException {
-    this.conf = conf;
-    this.admin = new HBaseAdmin(conf);
-    initialJob();
-  }
-
-  /**
-   * Constructor
-   * 
-   * @param conf
-   * @param admin
-   */
-  public HamaAdminImpl(HamaConfiguration conf, HBaseAdmin admin) {
-    this.conf = conf;
-    this.admin = admin;
-    initialJob();
-  }
-
-  /**
-   * Initializing the admin.
-   */
-  private void initialJob() {
-    try {
-      if (!admin.tableExists(Constants.ADMINTABLE)) {
-        HTableDescriptor tableDesc = new HTableDescriptor(Constants.ADMINTABLE);
-        tableDesc.addFamily(new HColumnDescriptor(Constants.PATHCOLUMN));
-        admin.createTable(tableDesc);
-      }
-
-      table = new HTable(conf, Constants.ADMINTABLE);
-      table.setAutoFlush(true);
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-
-  /**
-   * @param name
-   * @return real table name
-   */
-  public String getPath(String name) {
-    try {
-      byte[] result = table.get(name, Constants.PATHCOLUMN).getValue();
-      return Bytes.toString(result);
-    } catch (IOException e) {
-      e.printStackTrace();
-      return null;
-    }
-  }
-
-  public boolean matrixExists(String matrixName) {
-    try {
-      Cell result = table.get(matrixName, Constants.PATHCOLUMN);
-      return (result == null) ? false : true;
-    } catch (IOException e) {
-      e.printStackTrace();
-      return false;
-    }
-  }
-
-  public boolean save(Matrix mat, String aliaseName) {
-    boolean result = false;
-
-    // we just store the name -> path(tablename) here.
-    // the matrix type is stored in its hbase table. we don't need to store
-    // again.
-    BatchUpdate update = new BatchUpdate(aliaseName);
-    update.put(Constants.PATHCOLUMN, Bytes.toBytes(mat.getPath()));
-
-    try {
-      table.commit(update);
-
-      result = true;
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    return result;
-  }
-
-  /** remove the entry of 'matrixName' in admin table. * */
-  private void removeEntry(String matrixName) throws IOException {
-    table.deleteAll(matrixName);
-  }
-
-  private int getReference(String tableName) throws IOException {
-    HTable matrix = new HTable(conf, tableName);
-
-    Cell rows = null;
-    rows = matrix.get(Constants.METADATA, Constants.METADATA_REFERENCE);
-
-    return (rows == null) ? 0 : Bytes.toInt(rows.getValue());
-  }
-
-  private void clearAliaseInfo(String tableName) throws IOException {
-    HTable matrix = new HTable(conf, tableName);
-
-    matrix.deleteAll(Constants.METADATA, Constants.ALIASENAME);
-  }
-
-  /**
-   * we remove the aliase entry store in Admin table, and clear the aliase info
-   * store in matrix table. And check the reference of the matrix table:
-   * 
-   * 1) if the reference of the matrix table is zero: we delete the table. 
-   * 2) if the reference of the matrix table is not zero: we let the matrix who still
-   * reference the table to do the garbage collection.
-   */
-  public void delete(String matrixName) throws IOException {
-    if (matrixExists(matrixName)) {
-      String tablename = getPath(matrixName);
-
-      // i) remove the aliase entry first.
-      removeEntry(matrixName);
-
-      if (tablename == null) { // a matrixName point to a null table. we delete
-        // the entry.
-        return;
-      }
-
-      if (!admin.tableExists(tablename)) { // have not specified table.
-        return;
-      }
-
-      // ii) clear the aliase info store in matrix table.
-      clearAliaseInfo(tablename);
-
-      if (getReference(tablename) <= 0) { // no reference, do gc!!
-        if (admin.isTableEnabled(tablename)) {
-          while (admin.isTableEnabled(tablename)) {
-            try {
-              admin.disableTable(tablename);
-            } catch (RegionException e) {
-              LOG.warn(e);
-            }
-          }
-
-          admin.deleteTable(tablename);
-        }
-      }
-    }
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+/**
+ * An Implementation of {@link org.apache.hama.HamaAdmin} to manage the matrix's
+ * namespace, and table allocation & garbage collection.
+ */
+public class HamaAdminImpl implements HamaAdmin {
+  static final Logger LOG = Logger.getLogger(HamaAdminImpl.class);
+  protected HamaConfiguration conf;
+  protected HBaseAdmin admin;
+  protected HTable table;
+
+  /**
+   * Constructor
+   * 
+   * @param conf
+   * @throws MasterNotRunningException
+   */
+  public HamaAdminImpl(HamaConfiguration conf) throws MasterNotRunningException {
+    this.conf = conf;
+    this.admin = new HBaseAdmin(conf);
+    initialJob();
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param conf
+   * @param admin
+   */
+  public HamaAdminImpl(HamaConfiguration conf, HBaseAdmin admin) {
+    this.conf = conf;
+    this.admin = admin;
+    initialJob();
+  }
+
+  /**
+   * Initializing the admin.
+   */
+  private void initialJob() {
+    try {
+      if (!admin.tableExists(Constants.ADMINTABLE)) {
+        HTableDescriptor tableDesc = new HTableDescriptor(Constants.ADMINTABLE);
+        tableDesc.addFamily(new HColumnDescriptor(Constants.PATHCOLUMN));
+        admin.createTable(tableDesc);
+      }
+
+      table = new HTable(conf, Constants.ADMINTABLE);
+      table.setAutoFlush(true);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * @param name
+   * @return real table name
+   */
+  public String getPath(String name) {
+    try {
+      byte[] result = table.get(name, Constants.PATHCOLUMN).getValue();
+      return Bytes.toString(result);
+    } catch (IOException e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
+
+  public boolean matrixExists(String matrixName) {
+    try {
+      Cell result = table.get(matrixName, Constants.PATHCOLUMN);
+      return (result == null) ? false : true;
+    } catch (IOException e) {
+      e.printStackTrace();
+      return false;
+    }
+  }
+
+  public boolean save(Matrix mat, String aliaseName) {
+    boolean result = false;
+
+    // we just store the name -> path(tablename) here.
+    // the matrix type is stored in its hbase table. we don't need to store
+    // again.
+    BatchUpdate update = new BatchUpdate(aliaseName);
+    update.put(Constants.PATHCOLUMN, Bytes.toBytes(mat.getPath()));
+
+    try {
+      table.commit(update);
+
+      result = true;
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    return result;
+  }
+
+  /** remove the entry of 'matrixName' in admin table. * */
+  private void removeEntry(String matrixName) throws IOException {
+    table.deleteAll(matrixName);
+  }
+
+  private int getReference(String tableName) throws IOException {
+    HTable matrix = new HTable(conf, tableName);
+
+    Cell rows = null;
+    rows = matrix.get(Constants.METADATA, Constants.METADATA_REFERENCE);
+
+    return (rows == null) ? 0 : Bytes.toInt(rows.getValue());
+  }
+
+  private void clearAliaseInfo(String tableName) throws IOException {
+    HTable matrix = new HTable(conf, tableName);
+
+    matrix.deleteAll(Constants.METADATA, Constants.ALIASENAME);
+  }
+
+  /**
+   * we remove the aliase entry store in Admin table, and clear the aliase info
+   * store in matrix table. And check the reference of the matrix table:
+   * 
+   * 1) if the reference of the matrix table is zero: we delete the table. 2) if
+   * the reference of the matrix table is not zero: we let the matrix who still
+   * reference the table to do the garbage collection.
+   */
+  public void delete(String matrixName) throws IOException {
+    if (matrixExists(matrixName)) {
+      String tablename = getPath(matrixName);
+
+      // i) remove the aliase entry first.
+      removeEntry(matrixName);
+
+      if (tablename == null) { // a matrixName point to a null table. we delete
+        // the entry.
+        return;
+      }
+
+      if (!admin.tableExists(tablename)) { // have not specified table.
+        return;
+      }
+
+      // ii) clear the aliase info store in matrix table.
+      clearAliaseInfo(tablename);
+
+      if (getReference(tablename) <= 0) { // no reference, do gc!!
+        if (admin.isTableEnabled(tablename)) {
+          while (admin.isTableEnabled(tablename)) {
+            try {
+              admin.disableTable(tablename);
+            } catch (RegionException e) {
+              LOG.warn(e);
+            }
+          }
+
+          admin.deleteTable(tablename);
+        }
+      }
+    }
+  }
+
+  @Override
+  public Matrix getMatrix(String matrixName) throws IOException {
+    String path = getPath(matrixName);
+    if(getType(path).equals("SparseMatrix"))
+      return new SparseMatrix(conf, path);
+    else
+      return new DenseMatrix(conf, path);
+  }
+
+  private String getType(String path) {
+    try {
+      HTable matrix = new HTable(conf, path);
+      byte[] result = matrix.get(Constants.METADATA,
+          Constants.ATTRIBUTE + "type").getValue();
+      return Bytes.toString(result);
+    } catch (IOException e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java Mon Apr 20 06:56:05 2009
@@ -22,20 +22,34 @@
 import java.io.IOException;
 import java.util.Random;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hama.algebra.SIMDMultiplyMap;
-import org.apache.hama.algebra.SIMDMultiplyReduce;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hama.algebra.SparseMatrixVectorMultMap;
+import org.apache.hama.algebra.SparseMatrixVectorMultReduce;
 import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.RandomMatrixMap;
+import org.apache.hama.mapred.RandomMatrixReduce;
 import org.apache.hama.util.BytesUtil;
 import org.apache.hama.util.JobManager;
 import org.apache.hama.util.RandomVariable;
 
 public class SparseMatrix extends AbstractMatrix implements Matrix {
   static private final String TABLE_PREFIX = SparseMatrix.class.getSimpleName();
+  static private final Path TMP_DIR = new Path(SparseMatrix.class
+      .getSimpleName()
+      + "_TMP_dir");
+  
   public SparseMatrix(HamaConfiguration conf) throws IOException {
     setConfiguration(conf);
 
@@ -96,6 +110,59 @@
     return rand;
   }
   
+  public static SparseMatrix random_mapred(HamaConfiguration conf, int m, int n) throws IOException {
+    SparseMatrix rand = new SparseMatrix(conf);
+    LOG.info("Create the " + m + " * " + n + " random matrix : "
+        + rand.getPath());
+    rand.setDimension(m, n);
+
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setJobName("random matrix MR job : " + rand.getPath());
+
+    jobConf.setNumMapTasks(conf.getNumMapTasks());
+    jobConf.setNumReduceTasks(conf.getNumReduceTasks());
+
+    final Path inDir = new Path(TMP_DIR, "in");
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    jobConf.setMapperClass(RandomMatrixMap.class);
+    jobConf.setMapOutputKeyClass(IntWritable.class);
+    jobConf.setMapOutputValueClass(MapWritable.class);
+
+    RandomMatrixReduce.initJob(rand.getPath(), RandomMatrixReduce.class,
+        jobConf);
+    jobConf.setSpeculativeExecution(false);
+    jobConf.set("matrix.column", String.valueOf(n));
+    jobConf.set("matrix.type", TABLE_PREFIX);
+
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    final FileSystem fs = FileSystem.get(jobConf);
+    int interval = m / conf.getNumMapTasks();
+
+    // generate an input file for each map task
+    for (int i = 0; i < conf.getNumMapTasks(); ++i) {
+      final Path file = new Path(inDir, "part" + i);
+      final IntWritable start = new IntWritable(i * interval);
+      IntWritable end = null;
+      if ((i + 1) != conf.getNumMapTasks()) {
+        end = new IntWritable(((i * interval) + interval) - 1);
+      } else {
+        end = new IntWritable(m - 1);
+      }
+      final SequenceFile.Writer writer = SequenceFile.createWriter(fs, jobConf,
+          file, IntWritable.class, IntWritable.class, CompressionType.NONE);
+      try {
+        writer.append(start, end);
+      } finally {
+        writer.close();
+      }
+      System.out.println("Wrote input for Map #" + i);
+    }
+
+    JobClient.runJob(jobConf);
+    fs.delete(TMP_DIR, true);
+    return rand;
+  }
+  
   @Override
   public Matrix add(Matrix B) throws IOException {
     // TODO Auto-generated method stub
@@ -131,8 +198,7 @@
    * @throws IOException
    */
   public SparseVector getRow(int i) throws IOException {
-    // Should returns zero-fill vector.
-    return new SparseVector(table.getRow(BytesUtil.getRowIndex(i)));
+    return new SparseVector(table.getRow(BytesUtil.getRowIndex(i), new byte[][] { Bytes.toBytes(Constants.COLUMN) }));
   }
 
   /** {@inheritDoc} */
@@ -151,21 +217,30 @@
     return this.getClass().getSimpleName();
   }
 
-  @Override
+  /**
+   * C = A*B using iterative method
+   * 
+   * @param B
+   * @return C
+   * @throws IOException
+   */
   public SparseMatrix mult(Matrix B) throws IOException {
     SparseMatrix result = new SparseMatrix(config);
 
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("multiplication MR job : " + result.getPath());
-
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+    for(int i = 0; i < this.getRows(); i++) {
+      JobConf jobConf = new JobConf(config);
+      jobConf.setJobName("multiplication MR job : " + result.getPath() + " " + i);
+
+      jobConf.setNumMapTasks(config.getNumMapTasks());
+      jobConf.setNumReduceTasks(config.getNumReduceTasks());
+      
+      SparseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(), SparseMatrixVectorMultMap.class,
+          IntWritable.class, MapWritable.class, jobConf);
+      SparseMatrixVectorMultReduce.initJob(result.getPath(), SparseMatrixVectorMultReduce.class,
+          jobConf);
+      JobManager.execute(jobConf);
+    }
 
-    SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), this.getType(), SIMDMultiplyMap.class,
-        IntWritable.class, MapWritable.class, jobConf);
-    SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
-        jobConf);
-    JobManager.execute(jobConf);
     result.setDimension(this.getRows(), this.getColumns());
     return result;
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/SparseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SparseVector.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SparseVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SparseVector.java Mon Apr 20 06:56:05 2009
@@ -75,21 +75,13 @@
    * @return x = v + x
    */
   public SparseVector add(Vector v2) {
-    if (this.size() == 0) {
-      SparseVector trunk = (SparseVector) v2;
-      this.entries = trunk.entries;
-      return this;
-    }
 
     for (Map.Entry<Writable, Writable> e : v2.getEntries().entrySet()) {
+      int key = ((IntWritable) e.getKey()).get();
       if (this.entries.containsKey(e.getKey())) {
-        // add
-        double value = ((DoubleEntry) e.getValue()).getValue()
-            + this.get(((IntWritable) e.getKey()).get());
-        this.entries.put(e.getKey(), new DoubleEntry(value));
+        this.add(key, ((DoubleEntry) e.getValue()).getValue());
       } else {
-        // put
-        this.entries.put(e.getKey(), e.getValue());
+        this.set(key, ((DoubleEntry) e.getValue()).getValue());
       }
     }
 

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java?rev=766595&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java Mon Apr 20 06:56:05 2009
@@ -0,0 +1,66 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.DenseVector;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.mapred.VectorInputFormat;
+import org.apache.log4j.Logger;
+
+public class DenseMatrixVectorMultMap extends MapReduceBase implements
+    Mapper<IntWritable, MapWritable, IntWritable, MapWritable> {
+  static final Logger LOG = Logger.getLogger(DenseMatrixVectorMultMap.class);
+  protected DenseVector currVector;
+  public static final String ITH_ROW = "ith.row";
+  public static final String MATRIX_A = "hama.multiplication.matrix.a";
+  public static final String MATRIX_B = "hama.multiplication.matrix.b";
+  private IntWritable nKey = new IntWritable();
+  
+  public void configure(JobConf job) {
+    DenseMatrix matrix_a;
+      try {
+        matrix_a = new DenseMatrix(new HamaConfiguration(job), job.get(MATRIX_A, ""));
+        int ithRow = job.getInt(ITH_ROW, 0);
+        nKey.set(ithRow);
+        currVector = matrix_a.getRow(ithRow);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+  }
+
+  public static void initJob(int i, String matrix_a, String matrix_b,
+      Class<DenseMatrixVectorMultMap> map, Class<IntWritable> outputKeyClass,
+      Class<MapWritable> outputValueClass, JobConf jobConf) {
+
+    jobConf.setMapOutputValueClass(outputValueClass);
+    jobConf.setMapOutputKeyClass(outputKeyClass);
+    jobConf.setMapperClass(map);
+    jobConf.setInt(ITH_ROW, i);
+    jobConf.set(MATRIX_A, matrix_a);
+    jobConf.set(MATRIX_B, matrix_b);
+    
+    jobConf.setInputFormat(VectorInputFormat.class);
+    FileInputFormat.addInputPaths(jobConf, matrix_b);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+  }
+
+  @Override
+  public void map(IntWritable key, MapWritable value,
+      OutputCollector<IntWritable, MapWritable> output, Reporter reporter)
+      throws IOException {
+
+    DenseVector scaled = new DenseVector(value).scale(currVector.get(key.get()));
+    output.collect(nKey, scaled.getEntries());
+    
+  }
+}
\ No newline at end of file

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java?rev=766595&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java Mon Apr 20 06:56:05 2009
@@ -0,0 +1,62 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.DenseVector;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.log4j.Logger;
+
+public class DenseMatrixVectorMultReduce extends MapReduceBase implements
+    Reducer<IntWritable, MapWritable, IntWritable, VectorUpdate> {
+  static final Logger LOG = Logger.getLogger(DenseMatrixVectorMultReduce.class);
+  
+  /**
+   * Use this before submitting a TableReduce job. It will appropriately set up
+   * the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table,
+      Class<DenseMatrixVectorMultReduce> reducer, JobConf job) {
+    job.setOutputFormat(VectorOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+  
+  @Override
+  public void reduce(IntWritable key, Iterator<MapWritable> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException {
+    DenseVector sum = new DenseVector();
+    
+    while (values.hasNext()) {
+      DenseVector nVector = new DenseVector(values.next());
+      if(sum.size() == 0) {
+        sum.zeroFill(nVector.size());
+        sum.add(nVector);
+      } else {
+        sum.add(nVector);
+      }
+    }
+
+    VectorUpdate update = new VectorUpdate(key.get());
+    update.putAll(sum.getEntries());
+
+    output.collect(key, update);
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/SparseMatrixVectorMultMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/SparseMatrixVectorMultMap.java?rev=766595&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/SparseMatrixVectorMultMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/SparseMatrixVectorMultMap.java Mon Apr 20 06:56:05 2009
@@ -0,0 +1,88 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.SparseMatrix;
+import org.apache.hama.SparseVector;
+import org.apache.hama.mapred.VectorInputFormat;
+import org.apache.log4j.Logger;
+
+public class SparseMatrixVectorMultMap extends MapReduceBase implements
+    Mapper<IntWritable, MapWritable, IntWritable, MapWritable> {
+  static final Logger LOG = Logger.getLogger(SparseMatrixVectorMultMap.class);
+  protected SparseVector currVector;
+  public static final String ITH_ROW = "ith.row";
+  public static final String MATRIX_A = "hama.multiplication.matrix.a";
+  public static final String MATRIX_B = "hama.multiplication.matrix.b";
+  private IntWritable nKey = new IntWritable();
+  
+  public void configure(JobConf job) {
+      SparseMatrix matrix_a;
+      try {
+        matrix_a = new SparseMatrix(new HamaConfiguration(job), job.get(MATRIX_A, ""));
+        int ithRow = job.getInt(ITH_ROW, 0);
+        nKey.set(ithRow);
+        currVector = matrix_a.getRow(ithRow);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+  }
+
+  public static void initJob(int i, String matrix_a, String matrix_b,
+      Class<SparseMatrixVectorMultMap> map, Class<IntWritable> outputKeyClass,
+      Class<MapWritable> outputValueClass, JobConf jobConf) {
+
+    jobConf.setMapOutputValueClass(outputValueClass);
+    jobConf.setMapOutputKeyClass(outputKeyClass);
+    jobConf.setMapperClass(map);
+    jobConf.setInt(ITH_ROW, i);
+    jobConf.set(MATRIX_A, matrix_a);
+    jobConf.set(MATRIX_B, matrix_b);
+    
+    jobConf.setInputFormat(VectorInputFormat.class);
+    FileInputFormat.addInputPaths(jobConf, matrix_b);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+  }
+
+  @Override
+  public void map(IntWritable key, MapWritable value,
+      OutputCollector<IntWritable, MapWritable> output, Reporter reporter)
+      throws IOException {
+
+    double ithjth = currVector.get(key.get());
+    if(ithjth != 0) {
+      SparseVector scaled = new SparseVector(value).scale(ithjth);
+      output.collect(nKey, scaled.getEntries());
+    }
+    
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/SparseMatrixVectorMultReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/SparseMatrixVectorMultReduce.java?rev=766595&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/SparseMatrixVectorMultReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/SparseMatrixVectorMultReduce.java Mon Apr 20 06:56:05 2009
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.SparseVector;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.log4j.Logger;
+
+public class SparseMatrixVectorMultReduce extends MapReduceBase implements
+    Reducer<IntWritable, MapWritable, IntWritable, VectorUpdate> {
+  static final Logger LOG = Logger
+      .getLogger(SparseMatrixVectorMultReduce.class);
+
+  /**
+   * Use this before submitting a TableReduce job. It will appropriately set up
+   * the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table,
+      Class<SparseMatrixVectorMultReduce> reducer, JobConf job) {
+    job.setOutputFormat(VectorOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+
+  @Override
+  public void reduce(IntWritable key, Iterator<MapWritable> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException {
+    SparseVector sum = new SparseVector();
+
+    while (values.hasNext()) {
+      sum.add(new SparseVector(values.next()));
+    }
+    
+    VectorUpdate update = new VectorUpdate(key.get());
+    update.putAll(sum.getEntries());
+
+    output.collect(key, update);
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java Mon Apr 20 06:56:05 2009
@@ -20,6 +20,7 @@
 package org.apache.hama.mapred;
 
 import java.io.IOException;
+import java.util.Random;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
@@ -29,6 +30,8 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hama.DenseVector;
+import org.apache.hama.SparseVector;
+import org.apache.hama.Vector;
 import org.apache.hama.util.RandomVariable;
 import org.apache.log4j.Logger;
 
@@ -39,22 +42,40 @@
     Mapper<IntWritable, IntWritable, IntWritable, MapWritable> {
   static final Logger LOG = Logger.getLogger(RandomMatrixMap.class);
   protected int column;
-  protected DenseVector vector = new DenseVector();
-  
+  protected String type;
+  protected Vector vector = new DenseVector();
+
   @Override
   public void map(IntWritable key, IntWritable value,
       OutputCollector<IntWritable, MapWritable> output, Reporter report)
       throws IOException {
-    vector.clear();
-    for (int i = key.get(); i <= value.get(); i++) {
-      for (int j = 0; j < column; j++) {
-        vector.set(j, RandomVariable.rand());
+    if (type.equals("SparseMatrix")) {
+      ((SparseVector) vector).clear();
+      for (int i = key.get(); i <= value.get(); i++) {
+        for (int j = 0; j < column; j++) {
+          Random r = new Random(); 
+          if(r.nextInt(2) != 0)
+            ((SparseVector) vector).set(j, RandomVariable.rand());
+        }
+        output.collect(new IntWritable(i), vector.getEntries());
+      }
+    } else {
+      ((DenseVector) vector).clear();
+      for (int i = key.get(); i <= value.get(); i++) {
+        for (int j = 0; j < column; j++) {
+          ((DenseVector) vector).set(j, RandomVariable.rand());
+        }
+        output.collect(new IntWritable(i), vector.getEntries());
       }
-      output.collect(new IntWritable(i), vector.getEntries());
     }
   }
 
   public void configure(JobConf job) {
     column = Integer.parseInt(job.get("matrix.column"));
+    type = job.get("matrix.type");
+    if (type.equals("SparseMatrix"))
+      vector = new SparseVector();
+    else
+      vector = new DenseVector();
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java Mon Apr 20 06:56:05 2009
@@ -23,11 +23,13 @@
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hama.Constants;
+import org.apache.log4j.Logger;
 
 /**
  * Provides a bytes utility
  */
 public class BytesUtil {
+  static final Logger LOG = Logger.getLogger(BytesUtil.class);
   public static final int SIZEOF_DOUBLE = Double.SIZE/Byte.SIZE;
   public static final int PAD_SIZE = 15; 
   

Modified: incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java Mon Apr 20 06:56:05 2009
@@ -52,4 +52,19 @@
     JobClient.runJob(jobConf);
   }
   
+  public static class MultipleJob extends Thread {
+    private JobConf jobConf;
+
+    public MultipleJob(JobConf jobConf) {
+      this.jobConf = jobConf;
+    }
+
+    public void run() {
+      try {
+        JobClient.runJob(this.jobConf);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Mon Apr 20 06:56:05 2009
@@ -167,7 +167,7 @@
       }
     }
   }
-  
+
   public void testMultiMatrixAdd() throws IOException {
     Matrix result = ((DenseMatrix)m1).add(m2, m3);
     
@@ -326,7 +326,6 @@
         loadTest3.close();
     }
   }
-
   public void testForceCreate() throws IOException {
     String path2 = m2.getPath();
     // save m2 to aliase2
@@ -342,6 +341,9 @@
 
     assertEquals(path2, loadTest.getPath());
 
+    Matrix test = hamaAdmin.getMatrix(aliase2);
+    assertEquals(test.getType(), "DenseMatrix");
+    
     // force to create matrix loadTest2 using aliasename 'aliase2'
     DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true);
     String loadPath2 = loadTest2.getPath();

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java?rev=766595&r1=766594&r2=766595&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java Mon Apr 20 06:56:05 2009
@@ -23,6 +23,7 @@
 
 import org.apache.hama.DenseMatrix;
 import org.apache.hama.HCluster;
+import org.apache.hama.SparseMatrix;
 import org.apache.log4j.Logger;
 
 public class TestRandomMatrixMapReduce extends HCluster {
@@ -40,5 +41,18 @@
     }
     
     rand.close();
+    
+    SparseMatrix rand2 = SparseMatrix.random_mapred(conf, 20, 20);
+    assertEquals(20, rand2.getRows());
+    assertEquals(20, rand2.getColumns());
+    boolean zeroAppear = false;
+    for(int i = 0; i < 20; i++) {
+      for(int j = 0; j < 20; j++) {
+        if(rand2.get(i, j) == 0.0)
+          zeroAppear = true;
+      }
+    }
+    assertTrue(zeroAppear);
+    rand2.close();
   }
 }



Mime
View raw message