incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r817156 [2/3] - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/ src/java/org/apache/hama/algebra/ src/java/org/apache/hama/graph/ src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/ sr...
Date Mon, 21 Sep 2009 04:40:15 GMT
Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/Matrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/Matrix.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/Matrix.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/Matrix.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,288 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix;
+
+import java.io.IOException;
+
+
+/**
+ * Basic matrix interface.
+ */
+public interface Matrix {
+
+  /**
+   * Gets the double value of (i, j)
+   * 
+   * @param i ith row of the matrix
+   * @param j jth column of the matrix
+   * @return the value of entry
+   * @throws IOException
+   */
+  public double get(int i, int j) throws IOException;
+
+  /**
+   * Gets the vector of row
+   * 
+   * @param i the row index of the matrix
+   * @return the vector of row
+   * @throws IOException
+   */
+  public Vector getRow(int i) throws IOException;
+
+  /**
+   * Gets the vector of column
+   * 
+   * @param j the column index of the matrix
+   * @return the vector of column
+   * @throws IOException
+   */
+  public Vector getColumn(int j) throws IOException;
+
+  /**
+   * Get the number of row of the matrix from the meta-data column
+   * 
+   * @return a number of rows of the matrix
+   * @throws IOException
+   */
+  public int getRows() throws IOException;
+
+  /**
+   * Get the number of column of the matrix from the meta-data column
+   * 
+   * @return a number of columns of the matrix
+   * @throws IOException
+   */
+  public int getColumns() throws IOException;
+
+  /**
+   * Gets the label of the row
+   * 
+   * @throws IOException
+   */
+  public String getRowLabel(int i) throws IOException;
+
+  /**
+   * Gets the label of the column
+   * 
+   * @throws IOException
+   */
+  public String getColumnLabel(int j) throws IOException;
+
+  /**
+   * Return the matrix path. 
+   * (in hbase, path is the tablename. in filesystem, path may be a file path.)
+   * 
+   * @return the name of the matrix
+   */
+  public String getPath();
+
+  /**
+   * Sets the label of the row
+   * 
+   * @param i
+   * @param name
+   * @throws IOException
+   */
+  public void setRowLabel(int i, String name) throws IOException;
+
+  /**
+   * Sets the label of the column
+   * 
+   * @param j
+   * @param name
+   * @throws IOException
+   */
+  public void setColumnLabel(int j, String name) throws IOException;
+
+  /**
+   * Sets the double value of (i, j)
+   * 
+   * @param i ith row of the matrix
+   * @param j jth column of the matrix
+   * @param value the value of entry
+   * @throws IOException
+   */
+  public void set(int i, int j, double value) throws IOException;
+
+  /**
+   * A=alpha*B
+   * 
+   * @param alpha
+   * @param B
+   * @return A
+   * @throws IOException
+   */
+  public Matrix set(double alpha, Matrix B) throws IOException;
+
+  /**
+   * A=B
+   * 
+   * @param B
+   * @return A
+   * @throws IOException
+   */
+  public Matrix set(Matrix B) throws IOException;
+
+  /**
+   * Set the row of a matrix to a given vector
+   * 
+   * @param row
+   * @param vector
+   * @throws IOException
+   */
+  public void setRow(int row, Vector vector) throws IOException;
+
+  /**
+   * Set the column of a matrix to a given vector
+   * 
+   * @param column
+   * @param vector
+   * @throws IOException
+   */
+  public void setColumn(int column, Vector vector) throws IOException;
+
+  /**
+   * Sets the dimension of matrix
+   * 
+   * @param rows the number of rows
+   * @param columns the number of columns
+   * @throws IOException
+   */
+  public void setDimension(int rows, int columns) throws IOException;
+
+  /**
+   * A(i, j) += value
+   * 
+   * @param i
+   * @param j
+   * @param value
+   * @throws IOException
+   */
+  public void add(int i, int j, double value) throws IOException;
+
+  /**
+   * A = B + A
+   * 
+   * @param B
+   * @return A
+   * @throws IOException
+   */
+  public Matrix add(Matrix B) throws IOException;
+
+  /**
+   * A = alpha*B + A
+   * 
+   * @param alpha
+   * @param B
+   * @return A
+   * @throws IOException
+   */
+  public Matrix add(double alpha, Matrix B) throws IOException;
+
+  /**
+   * C = A*B
+   * 
+   * @param B
+   * @return C
+   * @throws IOException
+   */
+  public Matrix mult(Matrix B) throws IOException;
+
+  /**
+   * C = alpha*A*B + C
+   * 
+   * @param alpha
+   * @param B
+   * @param C
+   * @return C
+   * @throws IOException
+   */
+  public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException;
+
+  /**
+   * Computes the given norm of the matrix
+   * 
+   * @param type
+   * @return norm of the matrix
+   * @throws IOException
+   */
+  public double norm(Norm type) throws IOException;
+
+  /**
+   * Supported matrix-norms.
+   */
+  enum Norm {
+    /** Maximum absolute column sum */
+    One,
+
+    /** The root of sum of the sum of squares */
+    Frobenius,
+    
+    /** The maximum absolute row sum */
+    Infinity,
+    
+    /** Largest entry in absolute value.  */
+    Maxvalue
+  }
+
+  /**
+   * Transposes the matrix. In most cases, the matrix must be square
+   * for this to work.
+   * 
+   * @return the transposed matrix
+   */
+  public Matrix transpose() throws IOException;
+  
+  /**
+   * Save to a table or file
+   * 
+   * @param path
+   * @return true if saved
+   * @throws IOException
+   */
+  public boolean save(String path) throws IOException;
+
+  /**
+   * Returns the matrix type
+   * 
+   * @return the matrix type
+   */
+  public String getType();
+
+  /**
+   * Returns the sub matrix formed by selecting certain rows and
+   * columns from a bigger matrix. The sub matrix is a in-memory operation only.
+   * 
+   * @param i0 the start index of row
+   * @param i1 the end index of row
+   * @param j0 the start index of column
+   * @param j1 the end index of column
+   * @return the sub matrix of matrix
+   * @throws IOException
+   */
+  public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException;
+
+  /**
+   * Close current matrix.
+   * 
+   * @throws Exception
+   */
+  public void close() throws IOException;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseMatrix.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseMatrix.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseMatrix.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,296 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.RandomMatrixMap;
+import org.apache.hama.mapred.RandomMatrixReduce;
+import org.apache.hama.matrix.algebra.SparseMatrixVectorMultMap;
+import org.apache.hama.matrix.algebra.SparseMatrixVectorMultReduce;
+import org.apache.hama.util.BytesUtil;
+import org.apache.hama.util.JobManager;
+import org.apache.hama.util.RandomVariable;
+
+public class SparseMatrix extends AbstractMatrix implements Matrix {
+  static private final String TABLE_PREFIX = SparseMatrix.class.getSimpleName();
+  static private final Path TMP_DIR = new Path(SparseMatrix.class
+      .getSimpleName()
+      + "_TMP_dir");
+  
+  public SparseMatrix(HamaConfiguration conf, int m, int n) throws IOException {
+    setConfiguration(conf);
+
+    tryToCreateTable(TABLE_PREFIX);
+    closed = false;
+    this.setDimension(m, n);
+  }
+
+  /**
+   * Load a matrix from an existed matrix table whose tablename is 'matrixpath' !!
+   * It is an internal used for map/reduce.
+   * 
+   * @param conf configuration object
+   * @param matrixpath
+   * @throws IOException
+   * @throws IOException
+   */
+  public SparseMatrix(HamaConfiguration conf, String matrixpath)
+      throws IOException {
+    setConfiguration(conf);
+    matrixPath = matrixpath;
+    // load the matrix
+    table = new HTable(conf, matrixPath);
+    // TODO: now we don't increment the reference of the table
+    // for it's an internal use for map/reduce.
+    // if we want to increment the reference of the table,
+    // we don't know where to call Matrix.close in Add & Mul map/reduce
+    // process to decrement the reference. It seems difficulty.
+  }
+  
+  /**
+   * Generate matrix with random elements
+   * 
+   * @param conf configuration object
+   * @param m the number of rows.
+   * @param n the number of columns.
+   * @return an m-by-n matrix with uniformly distributed random elements.
+   * @throws IOException
+   */
+  public static SparseMatrix random(HamaConfiguration conf, int m, int n)
+      throws IOException {
+    SparseMatrix rand = new SparseMatrix(conf, m, n);
+    SparseVector vector = new SparseVector();
+    LOG.info("Create the " + m + " * " + n + " random matrix : "
+        + rand.getPath());
+
+    for (int i = 0; i < m; i++) {
+      vector.clear();
+      for (int j = 0; j < n; j++) {
+        Random r = new Random(); 
+        if(r.nextInt(2) != 0)
+          vector.set(j, RandomVariable.rand());
+      }
+      rand.setRow(i, vector);
+    }
+
+    return rand;
+  }
+  
+  public static SparseMatrix random_mapred(HamaConfiguration conf, int m, int n, double percent) throws IOException {
+    SparseMatrix rand = new SparseMatrix(conf, m, n);
+    LOG.info("Create the " + m + " * " + n + " random matrix : "
+        + rand.getPath());
+
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setJobName("random matrix MR job : " + rand.getPath());
+
+    jobConf.setNumMapTasks(conf.getNumMapTasks());
+    jobConf.setNumReduceTasks(conf.getNumReduceTasks());
+
+    final Path inDir = new Path(TMP_DIR, "in");
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    jobConf.setMapperClass(RandomMatrixMap.class);
+    jobConf.setMapOutputKeyClass(IntWritable.class);
+    jobConf.setMapOutputValueClass(MapWritable.class);
+
+    RandomMatrixReduce.initJob(rand.getPath(), RandomMatrixReduce.class,
+        jobConf);
+    jobConf.setSpeculativeExecution(false);
+    jobConf.setInt("matrix.column", n);
+    jobConf.set("matrix.type", TABLE_PREFIX);
+    jobConf.set("matrix.density", String.valueOf(percent));
+
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    final FileSystem fs = FileSystem.get(jobConf);
+    int interval = m / conf.getNumMapTasks();
+
+    // generate an input file for each map task
+    for (int i = 0; i < conf.getNumMapTasks(); ++i) {
+      final Path file = new Path(inDir, "part" + i);
+      final IntWritable start = new IntWritable(i * interval);
+      IntWritable end = null;
+      if ((i + 1) != conf.getNumMapTasks()) {
+        end = new IntWritable(((i * interval) + interval) - 1);
+      } else {
+        end = new IntWritable(m - 1);
+      }
+      final SequenceFile.Writer writer = SequenceFile.createWriter(fs, jobConf,
+          file, IntWritable.class, IntWritable.class, CompressionType.NONE);
+      try {
+        writer.append(start, end);
+      } finally {
+        writer.close();
+      }
+      System.out.println("Wrote input for Map #" + i);
+    }
+
+    JobClient.runJob(jobConf);
+    fs.delete(TMP_DIR, true);
+    return rand;
+  }
+  
+  @Override
+  public Matrix add(Matrix B) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public Matrix add(double alpha, Matrix B) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public double get(int i, int j) throws IOException {
+    if(this.getRows() < i || this.getColumns() < j)
+      throw new ArrayIndexOutOfBoundsException(i +", "+ j);
+    
+    Cell c = table.get(BytesUtil.getRowIndex(i), BytesUtil.getColumnIndex(j));
+    return (c != null) ? BytesUtil.bytesToDouble(c.getValue()) : 0.0;
+  }
+
+  @Override
+  public Vector getColumn(int j) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  /**
+   * Gets the vector of row
+   * 
+   * @param i the row index of the matrix
+   * @return the vector of row
+   * @throws IOException
+   */
+  public SparseVector getRow(int i) throws IOException {
+    return new SparseVector(table.getRow(BytesUtil.getRowIndex(i), new byte[][] { Bytes.toBytes(Constants.COLUMN) }));
+  }
+
+  /** {@inheritDoc} */
+  public void set(int i, int j, double value) throws IOException {
+    if(value != 0) {
+      VectorUpdate update = new VectorUpdate(i);
+      update.put(j, value);
+      table.commit(update.getBatchUpdate());
+    }
+  }
+  
+  /**
+   * Returns type of matrix
+   */
+  public String getType() {
+    return this.getClass().getSimpleName();
+  }
+
+  /**
+   * C = A*B using iterative method
+   * 
+   * @param B
+   * @return C
+   * @throws IOException
+   */
+  public SparseMatrix mult(Matrix B) throws IOException {
+    SparseMatrix result = new SparseMatrix(config, this.getRows(), this.getColumns());
+
+    for(int i = 0; i < this.getRows(); i++) {
+      JobConf jobConf = new JobConf(config);
+      jobConf.setJobName("multiplication MR job : " + result.getPath() + " " + i);
+
+      jobConf.setNumMapTasks(config.getNumMapTasks());
+      jobConf.setNumReduceTasks(config.getNumReduceTasks());
+      
+      SparseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(), SparseMatrixVectorMultMap.class,
+          IntWritable.class, MapWritable.class, jobConf);
+      SparseMatrixVectorMultReduce.initJob(result.getPath(), SparseMatrixVectorMultReduce.class,
+          jobConf);
+      JobManager.execute(jobConf);
+    }
+
+    return result;
+  }
+
+  @Override
+  public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  /**
+   * Computes the given norm of the matrix
+   * 
+   * @param type
+   * @return norm of the matrix
+   * @throws IOException
+   */
+  public double norm(Norm type) throws IOException {
+    if (type == Norm.One)
+      return getNorm1();
+    else if (type == Norm.Frobenius)
+      return getFrobenius();
+    else if (type == Norm.Infinity)
+      return getInfinity();
+    else
+      return getMaxvalue();
+  }
+
+  @Override
+  public void setColumn(int column, Vector vector) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public void setRow(int row, Vector vector) throws IOException {
+    if(this.getRows() < row)
+      throw new ArrayIndexOutOfBoundsException(row);
+    
+    if(vector.size() > 0) {  // stores if size > 0
+      VectorUpdate update = new VectorUpdate(row);
+      update.putAll(((SparseVector) vector).getEntries());
+      table.commit(update.getBatchUpdate());
+    }
+  }
+
+  @Override
+  public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}
\ No newline at end of file

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseVector.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseVector.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseVector.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,186 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.matrix.Vector.Norm;
+import org.apache.log4j.Logger;
+
+/**
+ * This class represents a sparse vector.
+ */
+public class SparseVector extends AbstractVector implements Vector {
+  static final Logger LOG = Logger.getLogger(SparseVector.class);
+
+  public SparseVector() {
+    this(new MapWritable());
+  }
+
+  public SparseVector(MapWritable m) {
+    this.entries = m;
+  }
+
+  public SparseVector(RowResult row) {
+    this.initMap(row);
+  }
+
+  @Override
+  public Vector add(double alpha, Vector v) {
+    if (alpha == 0)
+      return this;
+
+    for (Map.Entry<Writable, Writable> e : v.getEntries().entrySet()) {
+      if (this.entries.containsKey(e.getKey())) {
+        // add
+        double value = alpha * ((DoubleEntry) e.getValue()).getValue()
+            + this.get(((IntWritable) e.getKey()).get());
+        this.entries.put(e.getKey(), new DoubleEntry(value));
+      } else {
+        // put
+        double value = alpha * ((DoubleEntry) e.getValue()).getValue();
+        this.entries.put(e.getKey(), new DoubleEntry(value));
+      }
+    }
+
+    return this;
+  }
+
+  /**
+   * x = v + x
+   * 
+   * @param v2
+   * @return x = v + x
+   */
+  public SparseVector add(Vector v2) {
+
+    for (Map.Entry<Writable, Writable> e : v2.getEntries().entrySet()) {
+      int key = ((IntWritable) e.getKey()).get();
+      if (this.entries.containsKey(e.getKey())) {
+        this.add(key, ((DoubleEntry) e.getValue()).getValue());
+      } else {
+        this.set(key, ((DoubleEntry) e.getValue()).getValue());
+      }
+    }
+
+    return this;
+  }
+
+  @Override
+  public double dot(Vector v) {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public double norm(Norm type) {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  /**
+   * v = alpha*v
+   * 
+   * @param alpha
+   * @return v = alpha*v
+   */
+  public SparseVector scale(double alpha) {
+    for (Map.Entry<Writable, Writable> e : this.entries.entrySet()) {
+      this.entries.put(e.getKey(), new DoubleEntry(((DoubleEntry) e.getValue())
+          .getValue()
+          * alpha));
+    }
+    return this;
+  }
+
+  /**
+   * Gets the value of index
+   * 
+   * @param index
+   * @return the value of v(index)
+   * @throws IOException
+   */
+  public double get(int index) {
+    double value;
+    try {
+      value = ((DoubleEntry) this.entries.get(new IntWritable(index)))
+          .getValue();
+    } catch (NullPointerException e) { // returns zero if there is no value
+      return 0;
+    }
+
+    return value;
+  }
+
+  /**
+   * Sets the value of index
+   * 
+   * @param index
+   * @param value
+   */
+  public void set(int index, double value) {
+    // If entries are null, create new object
+    if (this.entries == null) {
+      this.entries = new MapWritable();
+    }
+
+    if (value != 0) // only stores non-zero element
+      this.entries.put(new IntWritable(index), new DoubleEntry(value));
+  }
+
+  /**
+   * Adds the value to v(index)
+   * 
+   * @param index
+   * @param value
+   */
+  public void add(int index, double value) {
+    set(index, get(index) + value);
+  }
+
+  /**
+   * Sets the vector
+   * 
+   * @param v
+   * @return x = v
+   */
+  public SparseVector set(Vector v) {
+    return new SparseVector(v.getEntries());
+  }
+
+  @Override
+  public Vector subVector(int i0, int i1) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public Vector set(double alpha, Vector v) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/SubMatrix.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/SubMatrix.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/SubMatrix.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,222 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * A sub matrix is a matrix formed by selecting certain rows and columns from a
+ * bigger matrix. This is a in-memory operation only.
+ */
+public class SubMatrix {
+  static final Logger LOG = Logger.getLogger(SubMatrix.class);
+  private double[][] matrix;
+
+  /**
+   * Constructor
+   * 
+   * @param i the size of rows
+   * @param j the size of columns
+   */
+  public SubMatrix(int i, int j) {
+    this.matrix = new double[i][j];
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param c a two dimensional double array
+   */
+  public SubMatrix(double[][] c) {
+    double[][] matrix = c;
+    this.matrix = matrix;
+  }
+
+  public SubMatrix(byte[] matrix) throws IOException {
+    ByteArrayInputStream bos = new ByteArrayInputStream(matrix);
+    DataInputStream dis = new DataInputStream(bos);
+    
+    int rows = dis.readInt();
+    int columns = dis.readInt();
+    this.matrix = new double[rows][columns];
+    
+    for(int i = 0; i < rows; i++) {
+      for(int j = 0; j < columns; j++) {
+        this.matrix[i][j] = dis.readDouble();        
+      }
+    }
+    
+    dis.close();
+    bos.close();
+  }
+  
+  /**
+   * Sets the value
+   * 
+   * @param row
+   * @param column
+   * @param value
+   */
+  public void set(int row, int column, double value) {
+    matrix[row][column] = value;
+  }
+
+  /**
+   * Sets the value
+   * 
+   * @param row
+   * @param column
+   * @param value
+   */
+  public void set(int row, int column, byte[] value) {
+    matrix[row][column] = BytesUtil.bytesToDouble(value);    
+  }
+  
+  /**
+   * Gets the value
+   * 
+   * @param i
+   * @param j
+   * @return the value of submatrix(i, j)
+   */
+  public double get(int i, int j) {
+    return matrix[i][j];
+  }
+
+  public void add(int row, int column, double value) {
+    matrix[row][column] = matrix[row][column] + value;
+  }
+
+  /**
+   * c = a+b
+   * 
+   * @param b
+   * @return c
+   */
+  public SubMatrix add(SubMatrix b) {
+    SubMatrix c = new SubMatrix(this.getRows(), this.getColumns());
+    
+    for (int i = 0; i < this.getRows(); i++) {
+      for (int j = 0; j < this.getColumns(); j++) {
+        c.set(i, j, (this.get(i, j) + b.get(i, j)));
+      }
+    }
+
+    return c;
+  }
+
+  /**
+   * c = a*b
+   * 
+   * @param b
+   * @return c
+   */
+  public SubMatrix mult(SubMatrix b) {
+    SubMatrix c = new SubMatrix(this.getRows(), b.getColumns());
+    
+    for (int i = 0; i < this.getRows(); i++) {
+      for (int j = 0; j < b.getColumns(); j++) {
+        for (int k = 0; k < this.getColumns(); k++) {
+          c.add(i, j, this.get(i, k) * b.get(k, j));
+        }
+      }
+    }
+
+    return c;
+  }
+
+  /**
+   * Gets the number of rows
+   * 
+   * @return the number of rows
+   */
+  public int getRows() {
+    return this.matrix.length;
+  }
+
+  /**
+   * Gets the number of columns
+   * 
+   * @return the number of columns
+   */
+  public int getColumns() {
+    return this.matrix[0].length;
+  }
+
+  /**
+   * Close
+   */
+  public void close() {
+    matrix = null;
+  }
+
+  /**
+   * @return the 2d double array
+   */
+  public double[][] getDoubleArray() {
+    double[][] result = matrix;
+    return result;
+  }
+
+  /**
+   * Gets the bytes of the sub matrix
+   * 
+   * @return the bytes of the sub matrix
+   * @throws IOException
+   */
+  public byte[] getBytes() throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    
+    dos.writeInt(this.getRows());
+    dos.writeInt(this.getColumns());
+    
+    for(int i = 0; i < this.getRows(); i++) {
+      for(int j = 0; j < this.getColumns(); j++) {
+        dos.writeDouble(this.get(i, j));
+      }
+    }
+
+    byte[] data = bos.toByteArray();
+    dos.close();
+    bos.close();
+    return data;
+  }
+
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    for (int i = 0; i < this.getRows(); i++) {
+      for (int j = 0; j < this.getColumns(); j++) {
+        result.append(this.get(i, j));
+        result.append('\t');
+      }
+      result.append('\n');
+    }
+    return result.toString();
+  }
+}
+

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/Vector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/Vector.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/Vector.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/Vector.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,161 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Basic vector interface.
+ */
+public interface Vector {
+
+  /**
+   * Size of the vector
+   * 
+   * @return size of the vector
+   */
+  public int size();
+
+  /**
+   * Gets the value of index
+   * 
+   * @param index
+   * @return v(index)
+   */
+  public double get(int index);
+
+  /**
+   * Sets the value of index
+   * 
+   * @param index
+   * @param value
+   */
+  public void set(int index, double value);
+
+  /**
+   * Sets the vector
+   * 
+   * @param v
+   * @return x = v
+   */
+  public Vector set(Vector v);
+
+  /**
+   * x = alpha * v
+   * 
+   * @param alpha
+   * @param v
+   * @return x = alpha * v
+   */
+  public Vector set(double alpha, Vector v);
+  
+  /**
+   * Adds the value to v(index)
+   * 
+   * @param index
+   * @param value
+   */
+  public void add(int index, double value);
+
+  /**
+   * x = alpha*v + x
+   * 
+   * @param alpha
+   * @param v
+   * @return x = alpha*v + x
+   */
+  public Vector add(double alpha, Vector v);
+
+  /**
+   * x = v + x
+   * 
+   * @param v
+   * @return x = v + x
+   */
+  public Vector add(Vector v);
+
+  /**
+   * x dot v
+   * 
+   * @param v
+   * @return x dot v
+   */
+  public double dot(Vector v);
+
+  /**
+   * v = alpha*v 
+   * 
+   * @param alpha
+   * @return v = alpha*v
+   */
+  public Vector scale(double alpha);
+  
+  /**
+   * Returns a sub-vector.
+   * 
+   * @param i0 the index of the first element
+   * @param i1 the index of the last element
+   * @return v[i0:i1]
+   */
+  public Vector subVector( int i0, int i1 ); 
+  
+  /**
+   * Computes the given norm of the vector
+   * 
+   * @param type
+   * @return norm of the vector
+   */
+  public double norm(Norm type);
+
+  /**
+   * Supported vector-norms.
+   */
+  enum Norm {
+
+    /** Sum of the absolute values of the entries */
+    One,
+
+    /** The root of sum of squares */
+    Two,
+
+    /** The robust norm of the vector */
+    TwoRobust,
+
+    /** Largest entry in absolute value */
+    Infinity
+  }
+
+  /**
+   * Returns an iterator
+   * 
+   * @return iterator
+   */
+  public Iterator<Writable> iterator();
+  
+  /**
+   * Returns the {@link org.apache.hadoop.io.MapWritable}
+   * 
+   * @return the entries of vector
+   */
+  public MapWritable getEntries();
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyMap.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyMap.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.mapred.BlockInputFormat;
+import org.apache.hama.matrix.SubMatrix;
+import org.apache.log4j.Logger;
+
+public class BlockMultiplyMap extends MapReduceBase implements
+    Mapper<BlockID, BlockWritable, BlockID, BlockWritable> {
+  static final Logger LOG = Logger.getLogger(BlockMultiplyMap.class);
+
+  public static void initJob(String matrix_a,
+      Class<BlockMultiplyMap> map, Class<BlockID> outputKeyClass,
+      Class<BlockWritable> outputValueClass, JobConf jobConf) {
+
+    jobConf.setMapOutputValueClass(outputValueClass);
+    jobConf.setMapOutputKeyClass(outputKeyClass);
+    jobConf.setMapperClass(map);
+
+    jobConf.setInputFormat(BlockInputFormat.class);
+    FileInputFormat.addInputPaths(jobConf, matrix_a);
+
+    jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
+  }
+
+  @Override
+  public void map(BlockID key, BlockWritable value,
+      OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
+      throws IOException {
+    SubMatrix c = value.get(0).mult(value.get(1));
+    output.collect(key, new BlockWritable(c));
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyReduce.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyReduce.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.hama.matrix.SubMatrix;
+import org.apache.log4j.Logger;
+
+public class BlockMultiplyReduce extends MapReduceBase implements
+    Reducer<BlockID, BlockWritable, IntWritable, VectorUpdate> {
+  static final Logger LOG = Logger.getLogger(BlockMultiplyReduce.class);
+
+  /**
+   * Use this before submitting a BlockCyclicMultiplyReduce job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table,
+      Class<BlockMultiplyReduce> reducer, JobConf job) {
+    job.setOutputFormat(VectorOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+
+  @Override
+  public void reduce(BlockID key, Iterator<BlockWritable> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException {
+
+    SubMatrix s = null;
+    while (values.hasNext()) {
+      SubMatrix b = values.next().getMatrices().next();
+      if (s == null) {
+        s = b;
+      } else {
+        s = s.add(b);
+      }
+    }
+
+    int startRow = key.getRow() * s.getRows();
+    int startColumn = key.getColumn() * s.getColumns();
+
+    for (int i = 0; i < s.getRows(); i++) {
+      VectorUpdate update = new VectorUpdate(i + startRow);
+      for (int j = 0; j < s.getColumns(); j++) {
+        update.put(j + startColumn, s.get(i, j));
+      }
+      output.collect(new IntWritable(key.getRow()), update);
+    }
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.mapred.VectorInputFormat;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.log4j.Logger;
+
+public class DenseMatrixVectorMultMap extends MapReduceBase implements
+    Mapper<IntWritable, MapWritable, IntWritable, MapWritable> {
+  static final Logger LOG = Logger.getLogger(DenseMatrixVectorMultMap.class);
+  protected DenseVector currVector;
+  public static final String ITH_ROW = "ith.row";
+  public static final String MATRIX_A = "hama.multiplication.matrix.a";
+  public static final String MATRIX_B = "hama.multiplication.matrix.b";
+  private IntWritable nKey = new IntWritable();
+  
+  public void configure(JobConf job) {
+    DenseMatrix matrix_a;
+      try {
+        matrix_a = new DenseMatrix(new HamaConfiguration(job), job.get(MATRIX_A, ""));
+        int ithRow = job.getInt(ITH_ROW, 0);
+        nKey.set(ithRow);
+        currVector = matrix_a.getRow(ithRow);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+  }
+
+  public static void initJob(int i, String matrix_a, String matrix_b,
+      Class<DenseMatrixVectorMultMap> map, Class<IntWritable> outputKeyClass,
+      Class<MapWritable> outputValueClass, JobConf jobConf) {
+
+    jobConf.setMapOutputValueClass(outputValueClass);
+    jobConf.setMapOutputKeyClass(outputKeyClass);
+    jobConf.setMapperClass(map);
+    jobConf.setInt(ITH_ROW, i);
+    jobConf.set(MATRIX_A, matrix_a);
+    jobConf.set(MATRIX_B, matrix_b);
+    
+    jobConf.setInputFormat(VectorInputFormat.class);
+    FileInputFormat.addInputPaths(jobConf, matrix_b);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+  }
+
+  @Override
+  public void map(IntWritable key, MapWritable value,
+      OutputCollector<IntWritable, MapWritable> output, Reporter reporter)
+      throws IOException {
+
+    DenseVector scaled = new DenseVector(value).scale(currVector.get(key.get()));
+    output.collect(nKey, scaled.getEntries());
+    
+  }
+}
\ No newline at end of file

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.log4j.Logger;
+
+public class DenseMatrixVectorMultReduce extends MapReduceBase implements
+    Reducer<IntWritable, MapWritable, IntWritable, VectorUpdate> {
+  static final Logger LOG = Logger.getLogger(DenseMatrixVectorMultReduce.class);
+  
+  /**
+   * Use this before submitting a TableReduce job. It will appropriately set up
+   * the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table,
+      Class<DenseMatrixVectorMultReduce> reducer, JobConf job) {
+    job.setOutputFormat(VectorOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+  
+  @Override
+  public void reduce(IntWritable key, Iterator<MapWritable> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException {
+    DenseVector sum = new DenseVector();
+    
+    while (values.hasNext()) {
+      DenseVector nVector = new DenseVector(values.next());
+      if(sum.size() == 0) {
+        sum.zeroFill(nVector.size());
+        sum.add(nVector);
+      } else {
+        sum.add(nVector);
+      }
+    }
+
+    VectorUpdate update = new VectorUpdate(key.get());
+    update.putAll(sum.getEntries());
+
+    output.collect(key, update);
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,583 @@
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.io.Pair;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.HTableInputFormatBase;
+import org.apache.hama.mapred.HTableRecordReaderBase;
+import org.apache.hama.util.BytesUtil;
+
+/**
+ * A catalog class collect all the m/r classes to compute the matrix's eigen
+ * values
+ */
+public class JacobiEigenValue {
+
+  /** a matrix copy of the original copy collected in "eicol" family * */
+  public static final String EICOL = "eicol:";
+  /** a column family collect all values and statuses used during computation * */
+  public static final String EI = "eival:";
+  /** a column collect all the eigen values * */
+  public static final String EIVAL = EI + "value";
+  /** a column identify whether the eigen values have been changed * */
+  public static final String EICHANGED = EI + "changed";
+  /** a column identify the index of the max absolute value each row * */
+  public static final String EIIND = EI + "ind";
+  /** a matrix collect all the eigen vectors * */
+  public static final String EIVEC = "eivec:";
+  public static final String MATRIX = "hama.jacobieigenvalue.matrix";
+  /** parameters for pivot * */
+  public static final String PIVOTROW = "hama.jacobi.pivot.row";
+  public static final String PIVOTCOL = "hama.jacobi.pivot.col";
+  public static final String PIVOTSIN = "hama.jacobi.pivot.sin";
+  public static final String PIVOTCOS = "hama.jacobi.pivot.cos";
+
+  static final Log LOG = LogFactory.getLog(JacobiEigenValue.class);
+
+  /**
+   * The matrix will be modified during computing eigen value. So a new matrix
+   * will be created to prevent the original matrix being modified. To reduce
+   * the network transfer, we copy the "column" family in the original matrix to
+   * a "eicol" family. All the following modification will be done over "eicol"
+   * family.
+   * 
+   * And the output Eigen Vector Arrays "eivec", and the output eigen value
+   * array "eival:value", and the temp status array "eival:changed", "eival:ind"
+   * will be created.
+   * 
+   * Also "eival:state" will record the state of the rotation state of a matrix
+   */
+  public static class InitMapper extends MapReduceBase implements
+      Mapper<IntWritable, MapWritable, NullWritable, NullWritable> {
+
+    HTable table;
+
+    @Override
+    public void configure(JobConf job) {
+      String tableName = job.get(MATRIX, "");
+      try {
+        table = new HTable(new HBaseConfiguration(job), tableName);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void map(IntWritable key, MapWritable value,
+        OutputCollector<NullWritable, NullWritable> collector, Reporter reporter)
+        throws IOException {
+      int row, col;
+      row = key.get();
+      VectorUpdate vu = new VectorUpdate(row);
+
+      double val;
+      double maxVal = Double.MIN_VALUE;
+      int maxInd = row + 1;
+
+      boolean init = true;
+      for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+        val = ((DoubleEntry) e.getValue()).getValue();
+        col = ((IntWritable) e.getKey()).get();
+        // copy the original matrix to "EICOL" family
+        vu.put(JacobiEigenValue.EICOL, col, val);
+        // make the "EIVEC" a dialog matrix
+        vu.put(JacobiEigenValue.EIVEC, col, col == row ? 1 : 0);
+        if (col == row) {
+          vu.put(JacobiEigenValue.EIVAL, val);
+        }
+        // find the max index
+        if (col > row) {
+          if (init) {
+            maxInd = col;
+            maxVal = val;
+            init = false;
+          } else {
+            if (Math.abs(val) > Math.abs(maxVal)) {
+              maxVal = val;
+              maxInd = col;
+            }
+          }
+        }
+      }
+      // index array
+      vu.put(JacobiEigenValue.EIIND, maxInd);
+      // Changed Array set to be true during initialization
+      vu.put(JacobiEigenValue.EICHANGED, 1);
+
+      table.commit(vu.getBatchUpdate());
+    }
+
+  }
+
+  /**
+   * PivotInputFormat & PivotMapper & PivotReducer are used to find the pivot in
+   * a matrix
+   */
+  public static class PivotInputFormat extends HTableInputFormatBase implements
+      InputFormat<Pair, DoubleWritable>, JobConfigurable {
+
+    private PivotRecordReader tableRecordReader;
+
+    protected static class PivotRecordReader extends HTableRecordReaderBase
+        implements RecordReader<Pair, DoubleWritable> {
+
+      private int totalRows;
+      private int processedRows;
+      private int size;
+      boolean mocked = true;
+
+      @Override
+      public void init() throws IOException {
+        super.init();
+
+        Cell rows = null;
+        rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS);
+        size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+
+        if (endRow.length == 0) { // the last split, we don't know the end row
+          totalRows = 0; // so we just skip it.
+        } else {
+          if (startRow.length == 0) { // the first split, start row is 0
+            totalRows = BytesUtil.bytesToInt(endRow);
+          } else {
+            totalRows = BytesUtil.bytesToInt(endRow)
+                - BytesUtil.bytesToInt(startRow);
+          }
+        }
+        processedRows = 0;
+        LOG.info("Split (" + Bytes.toString(startRow) + ", "
+            + Bytes.toString(endRow) + ") -> " + totalRows);
+      }
+
+      /**
+       * @return Pair
+       * 
+       * @see org.apache.hadoop.mapred.RecordReader#createKey()
+       */
+      public Pair createKey() {
+        return new Pair();
+      }
+
+      /**
+       * @return DoubleWritable
+       * 
+       * @see org.apache.hadoop.mapred.RecordReader#createValue()
+       */
+      public DoubleWritable createValue() {
+        return new DoubleWritable();
+      }
+
+      /**
+       * @param key Pair as input key.
+       * @param value DoubleWritable as input value
+       * 
+       * Converts Scanner.next() to Pair, DoubleWritable
+       * 
+       * @return true if there was more data
+       * @throws IOException
+       */
+      public boolean next(Pair key, DoubleWritable value) throws IOException {
+        RowResult result;
+        try {
+          result = this.scanner.next();
+        } catch (UnknownScannerException e) {
+          LOG.debug("recovered from " + StringUtils.stringifyException(e));
+          restart(lastRow);
+          this.scanner.next(); // skip presumed already mapped row
+          result = this.scanner.next();
+        }
+
+        boolean hasMore = result != null && result.size() > 0;
+        if (hasMore) {
+          byte[] row = result.getRow();
+          int rowId = BytesUtil.bytesToInt(row);
+          if (rowId == size - 1) { // skip the last row
+            if (mocked) {
+              key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+              mocked = false;
+              return true;
+            } else {
+              return false;
+            }
+          }
+
+          byte[] col = result.get(EIIND).getValue();
+          int colId = BytesUtil.bytesToInt(col);
+          double val = 0;
+
+          // get (rowId, colId)'s value
+          Cell cell = htable.get(BytesUtil.getRowIndex(rowId), Bytes
+              .toBytes(EICOL + colId));
+          if (cell != null && cell.getValue() != null) {
+            val = BytesUtil.bytesToDouble(cell.getValue());
+          }
+
+          key.set(rowId, colId);
+          value.set(val);
+
+          lastRow = row;
+          processedRows++;
+        } else {
+          if (mocked) {
+            key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+            mocked = false;
+            return true;
+          } else {
+            return false;
+          }
+        }
+        return hasMore;
+      }
+
+      @Override
+      public float getProgress() {
+        if (totalRows <= 0) {
+          return 0;
+        } else {
+          return Math.min(1.0f, processedRows / (float) totalRows);
+        }
+      }
+
+    }
+
+    @Override
+    public RecordReader<Pair, DoubleWritable> getRecordReader(InputSplit split,
+        JobConf conf, Reporter reporter) throws IOException {
+      TableSplit tSplit = (TableSplit) split;
+      PivotRecordReader trr = this.tableRecordReader;
+      // if no table record reader was provided use default
+      if (trr == null) {
+        trr = new PivotRecordReader();
+      }
+      trr.setStartRow(tSplit.getStartRow());
+      trr.setEndRow(tSplit.getEndRow());
+      trr.setHTable(this.table);
+      trr.setInputColumns(this.inputColumns);
+      trr.setRowFilter(this.rowFilter);
+      trr.init();
+      return trr;
+    }
+
+    protected void setTableRecordReader(PivotRecordReader tableRecordReader) {
+      this.tableRecordReader = tableRecordReader;
+    }
+
+  }
+
+  // find the pivot of the matrix
+  public static class PivotMapper extends MapReduceBase implements
+      Mapper<Pair, DoubleWritable, Pair, DoubleWritable> {
+
+    private double max = 0;
+    private Pair pair = new Pair(0, 0);
+    private Pair dummyPair = new Pair(Integer.MAX_VALUE, Integer.MAX_VALUE);
+    private DoubleWritable dummyVal = new DoubleWritable(0.0);
+
+    @Override
+    public void map(Pair key, DoubleWritable value,
+        OutputCollector<Pair, DoubleWritable> collector, Reporter reporter)
+        throws IOException {
+      if (key.getRow() != Integer.MAX_VALUE) {
+        if (Math.abs(value.get()) > Math.abs(max)) {
+          pair.set(key.getRow(), key.getColumn());
+          max = value.get();
+        }
+      } else {
+        collector.collect(pair, new DoubleWritable(max));
+        collector.collect(dummyPair, dummyVal);
+      }
+    }
+
+  }
+
+  public static class PivotReducer extends MapReduceBase implements
+      Reducer<Pair, DoubleWritable, Pair, DoubleWritable> {
+
+    private double max = 0;
+    private Pair pair = new Pair(0, 0);
+
+    @Override
+    public void reduce(Pair key, Iterator<DoubleWritable> values,
+        OutputCollector<Pair, DoubleWritable> collector, Reporter reporter)
+        throws IOException {
+      double val;
+      if (key.getRow() != Integer.MAX_VALUE) {
+        val = values.next().get();
+        if (Math.abs(val) > Math.abs(max)) {
+          pair.set(key.getRow(), key.getColumn());
+          max = val;
+        }
+      } else {
+        collector.collect(pair, new DoubleWritable(max));
+      }
+    }
+
+  }
+
+  /**
+   * Tricky here! we rotation the matrix during we scan the matrix and update to
+   * the matrix so we just need a rotationrecordreader to scan the matrix and do
+   * the rotation the mapper&reducer just a dummy mapper
+   */
+  public static class RotationInputFormat extends HTableInputFormatBase
+      implements InputFormat<NullWritable, NullWritable>, JobConfigurable {
+
+    private RotationRecordReader tableRecordReader;
+
+    int pivot_row, pivot_col;
+    double pivot_cos, pivot_sin;
+
+    public void configure(JobConf job) {
+      super.configure(job);
+      pivot_row = job.getInt(PIVOTROW, -1);
+      pivot_col = job.getInt(PIVOTCOL, -1);
+      pivot_sin = Double.parseDouble(job.get(PIVOTSIN));
+      pivot_cos = Double.parseDouble(job.get(PIVOTCOS));
+    }
+
+    protected static class RotationRecordReader extends HTableRecordReaderBase
+        implements RecordReader<NullWritable, NullWritable> {
+
+      private int totalRows;
+      private int processedRows;
+      int startRowId, endRowId = -1;
+      int size;
+
+      int pivotrow, pivotcol;
+      byte[] prow, pcol;
+      double pivotcos, pivotsin;
+
+      public RotationRecordReader(int pr, int pc, double psin, double pcos) {
+        super();
+        pivotrow = pr;
+        pivotcol = pc;
+        pivotsin = psin;
+        pivotcos = pcos;
+        prow = Bytes.toBytes(pivotrow);
+        pcol = Bytes.toBytes(pivotcol);
+        LOG.info(prow);
+        LOG.info(pcol);
+      }
+
+      @Override
+      public void init() throws IOException {
+        super.init();
+
+        Cell rows = null;
+        rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS);
+        size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+
+        if (endRow.length == 0) { // the last split, we don't know the end row
+          totalRows = 0; // so we just skip it.
+          if (startRow.length == 0)
+            startRowId = 0;
+          else
+            startRowId = BytesUtil.bytesToInt(startRow);
+          endRowId = -1;
+        } else {
+          if (startRow.length == 0) { // the first split, start row is 0
+            totalRows = BytesUtil.bytesToInt(endRow);
+            startRowId = 0;
+            endRowId = totalRows;
+          } else {
+            startRowId = BytesUtil.bytesToInt(startRow);
+            endRowId = BytesUtil.bytesToInt(endRow);
+            totalRows = startRowId - endRowId;
+          }
+        }
+        processedRows = 0;
+        LOG
+            .info("Split (" + startRowId + ", " + endRowId + ") -> "
+                + totalRows);
+      }
+
+      /**
+       * @return NullWritable
+       * 
+       * @see org.apache.hadoop.mapred.RecordReader#createKey()
+       */
+      public NullWritable createKey() {
+        return NullWritable.get();
+      }
+
+      /**
+       * @return NullWritable
+       * 
+       * @see org.apache.hadoop.mapred.RecordReader#createValue()
+       */
+      public NullWritable createValue() {
+        return NullWritable.get();
+      }
+
+      /**
+       * @param key NullWritable as input key.
+       * @param value NullWritable as input value
+       * 
+       * Converts Scanner.next() to NullWritable, NullWritable
+       * 
+       * @return true if there was more data
+       * @throws IOException
+       */
+      public boolean next(NullWritable key, NullWritable value)
+          throws IOException {
+        RowResult result;
+        try {
+          result = this.scanner.next();
+        } catch (UnknownScannerException e) {
+          LOG.debug("recovered from " + StringUtils.stringifyException(e));
+          restart(lastRow);
+          this.scanner.next(); // skip presumed already mapped row
+          result = this.scanner.next();
+        }
+
+        double s1, s2;
+        VectorUpdate bu;
+        boolean hasMore = result != null && result.size() > 0;
+        if (hasMore) {
+          byte[] row = result.getRow();
+          int rowId = BytesUtil.bytesToInt(row);
+          if (rowId < pivotrow) {
+            s1 = BytesUtil.bytesToDouble(htable.get(
+                BytesUtil.getRowIndex(rowId),
+                Bytes.toBytes(JacobiEigenValue.EICOL + pivotrow)).getValue());
+            s2 = BytesUtil.bytesToDouble(htable.get(
+                BytesUtil.getRowIndex(rowId),
+                Bytes.toBytes(JacobiEigenValue.EICOL + pivotcol)).getValue());
+
+            bu = new VectorUpdate(rowId);
+            bu.put(EICOL, pivotrow, pivotcos * s1 - pivotsin * s2);
+            bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2);
+
+            htable.commit(bu.getBatchUpdate());
+          } else if (rowId == pivotrow) {
+            return true;
+          } else if (rowId < pivotcol) {
+            s1 = BytesUtil.bytesToDouble(htable.get(
+                BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + rowId))
+                .getValue());
+            s2 = BytesUtil.bytesToDouble(htable.get(
+                BytesUtil.getRowIndex(rowId), Bytes.toBytes(EICOL + pivotcol))
+                .getValue());
+
+            bu = new VectorUpdate(rowId);
+            bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2);
+            htable.commit(bu.getBatchUpdate());
+
+            bu = new VectorUpdate(pivotrow);
+            bu.put(EICOL, rowId, pivotcos * s1 - pivotsin * s2);
+            htable.commit(bu.getBatchUpdate());
+          } else if (rowId == pivotcol) {
+            for (int i = pivotcol + 1; i < size; i++) {
+              s1 = BytesUtil.bytesToDouble(htable.get(
+                  BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + i))
+                  .getValue());
+              s2 = BytesUtil.bytesToDouble(htable.get(
+                  BytesUtil.getRowIndex(pivotcol), Bytes.toBytes(EICOL + i))
+                  .getValue());
+
+              bu = new VectorUpdate(pivotcol);
+              bu.put(EICOL, i, pivotsin * s1 + pivotcos * s2);
+              htable.commit(bu.getBatchUpdate());
+
+              bu = new VectorUpdate(pivotrow);
+              bu.put(EICOL, i, pivotcos * s1 - pivotsin * s2);
+              htable.commit(bu.getBatchUpdate());
+            }
+          } else { // rowId > pivotcol
+            return false;
+          }
+
+          lastRow = row;
+          processedRows++;
+        }
+        return hasMore;
+      }
+
+      @Override
+      public float getProgress() {
+        if (totalRows <= 0) {
+          return 0;
+        } else {
+          return Math.min(1.0f, processedRows / (float) totalRows);
+        }
+      }
+
+    }
+
+    public InputSplit[] getSplits(JobConf job, int numSplits)
+        throws IOException {
+      InputSplit[] splits = super.getSplits(job, numSplits);
+      List<InputSplit> newSplits = new ArrayList<InputSplit>();
+      for (InputSplit split : splits) {
+        TableSplit ts = (TableSplit) split;
+        byte[] row = ts.getStartRow();
+        if (row.length == 0) // the first split
+          newSplits.add(split);
+        else {
+          if (BytesUtil.bytesToInt(ts.getStartRow()) < pivot_col) {
+            newSplits.add(split);
+          }
+        }
+      }
+
+      return newSplits.toArray(new InputSplit[newSplits.size()]);
+    }
+
+    @Override
+    public RecordReader<NullWritable, NullWritable> getRecordReader(
+        InputSplit split, JobConf conf, Reporter reporter) throws IOException {
+      TableSplit tSplit = (TableSplit) split;
+      RotationRecordReader trr = this.tableRecordReader;
+      // if no table record reader was provided use default
+      if (trr == null) {
+        trr = new RotationRecordReader(pivot_row, pivot_col, pivot_sin,
+            pivot_cos);
+      }
+      trr.setStartRow(tSplit.getStartRow());
+      trr.setEndRow(tSplit.getEndRow());
+      trr.setHTable(this.table);
+      trr.setInputColumns(this.inputColumns);
+      trr.setRowFilter(this.rowFilter);
+      trr.init();
+      return trr;
+    }
+
+    protected void setTableRecordReader(RotationRecordReader tableRecordReader) {
+      this.tableRecordReader = tableRecordReader;
+    }
+
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMap.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMap.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,70 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.mapred.VectorInputFormat;
+
+public class MatrixNormMap extends MapReduceBase implements
+    Mapper<IntWritable, MapWritable, IntWritable, DoubleWritable> {
+  private IntWritable nKey = new IntWritable(-1);
+  private DoubleWritable nValue = new DoubleWritable();
+  
+  public static void initJob(String path, Class<MatrixNormMap> map,
+      Class<IntWritable> outputKeyClass, Class<DoubleWritable> outputValueClass,
+      JobConf jobConf) {
+    jobConf.setMapOutputValueClass(outputValueClass);
+    jobConf.setMapOutputKeyClass(outputKeyClass);
+    jobConf.setMapperClass(map);
+
+    jobConf.setInputFormat(VectorInputFormat.class);
+    FileInputFormat.addInputPaths(jobConf, path);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+  }
+
+  @Override
+  public void map(IntWritable key, MapWritable value,
+      OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+      throws IOException {
+
+    double rowSum = 0;
+    for(Map.Entry<Writable, Writable> e : value.entrySet()) {
+      rowSum += Math.abs(((DoubleEntry) e.getValue()).getValue());
+    }
+    nValue.set(rowSum);
+    
+    output.collect(nKey, nValue);
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapRed.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapRed.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapRed.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hama.Constants;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.mapred.VectorInputFormat;
+
+/** A Catalog class collect all the mr classes to compute the matrix's norm */
+public class MatrixNormMapRed {
+
+  /**
+   * Initialize the job to compute the matrix's norm
+   * 
+   * @param inputMatrixPath the input matrix's path
+   * @param outputPath the output file's name that records the norm of the
+   *                matrix
+   * @param mapper Mapper
+   * @param combiner Combiner
+   * @param reducer Reducer
+   * @param jobConf Configuration of the job
+   */
+  public static void initJob(String inputMatrixPath, String outputPath,
+      Class<? extends MatrixNormMapper> mapper,
+      Class<? extends MatrixNormReducer> combiner,
+      Class<? extends MatrixNormReducer> reducer, JobConf jobConf) {
+    jobConf.setMapperClass(mapper);
+    jobConf.setMapOutputKeyClass(IntWritable.class);
+    jobConf.setMapOutputValueClass(DoubleWritable.class);
+    jobConf.setCombinerClass(combiner);
+    jobConf.setReducerClass(reducer);
+    jobConf.setOutputKeyClass(IntWritable.class);
+    jobConf.setOutputValueClass(DoubleWritable.class);
+
+    // input
+    jobConf.setInputFormat(VectorInputFormat.class);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+    FileInputFormat.addInputPaths(jobConf, inputMatrixPath);
+    // output
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
+  }
+
+  /** the interface of norm mapper */
+  public static interface MatrixNormMapper extends
+      Mapper<IntWritable, MapWritable, IntWritable, DoubleWritable> {
+    IntWritable nKey = new IntWritable(-1);
+    DoubleWritable nValue = new DoubleWritable(0);
+  }
+
+  /** the interface of norm reducer/combiner */
+  public static interface MatrixNormReducer extends
+      Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+    IntWritable nKey = new IntWritable(-1);
+    DoubleWritable nValue = new DoubleWritable(0);
+  }
+
+  // /
+  // / Infinity Norm
+  // /
+
+  /** Infinity Norm */
+  public static class MatrixInfinityNormMapper extends MapReduceBase implements
+      MatrixNormMapper {
+
+    @Override
+    public void map(IntWritable key, MapWritable value,
+        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+
+      double rowSum = 0;
+      for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+        rowSum += Math.abs(((DoubleEntry) e.getValue()).getValue());
+      }
+      nValue.set(rowSum);
+
+      output.collect(nKey, nValue);
+    }
+
+  }
+
+  /**
+   * Matrix Infinity Norm Reducer
+   */
+  public static class MatrixInfinityNormReducer extends MapReduceBase implements
+      MatrixNormReducer {
+
+    private double max = 0;
+
+    @Override
+    public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+
+      while (values.hasNext()) {
+        max = Math.max(values.next().get(), max);
+      }
+
+      // Note: Tricky here. As we known, we collect each row's sum with key(-1).
+      // the reduce will just iterate through one key (-1)
+      // so we collect the max sum-value here
+      nValue.set(max);
+      output.collect(nKey, nValue);
+    }
+
+  }
+
+  // /
+  // / One Norm
+  // /
+
+  /** One Norm Mapper */
+  public static class MatrixOneNormMapper extends MapReduceBase implements
+      MatrixNormMapper {
+
+    @Override
+    public void map(IntWritable key, MapWritable value,
+        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+
+      for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+        nValue.set(((DoubleEntry) e.getValue()).getValue());
+        output.collect((IntWritable) e.getKey(), nValue);
+      }
+    }
+  }
+
+  /** One Norm Combiner * */
+  public static class MatrixOneNormCombiner extends MapReduceBase implements
+      MatrixNormReducer {
+
+    @Override
+    public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+
+      double partialColSum = 0;
+      while (values.hasNext()) {
+        partialColSum += values.next().get();
+      }
+      nValue.set(partialColSum);
+      output.collect(key, nValue);
+    }
+  }
+
+  /** One Norm Reducer * */
+  public static class MatrixOneNormReducer extends MapReduceBase implements
+      MatrixNormReducer {
+    private double max = 0;
+    private Path outDir;
+    private JobConf conf;
+
+    @Override
+    public void configure(JobConf job) {
+      outDir = FileOutputFormat.getOutputPath(job);
+      conf = job;
+    }
+
+    @Override
+    public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+      double colSum = 0;
+      while (values.hasNext()) {
+        colSum += values.next().get();
+      }
+
+      max = Math.max(Math.abs(colSum), max);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // write output to a file
+      Path outFile = new Path(outDir, "reduce-out");
+      FileSystem fileSys = FileSystem.get(conf);
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+          outFile, IntWritable.class, DoubleWritable.class,
+          CompressionType.NONE);
+      writer.append(new IntWritable(-1), new DoubleWritable(max));
+      writer.close();
+    }
+  }
+
+  // /
+  // / Frobenius Norm
+  // /
+
+  /** Frobenius Norm Mapper */
+  public static class MatrixFrobeniusNormMapper extends MapReduceBase implements
+      MatrixNormMapper {
+    @Override
+    public void map(IntWritable key, MapWritable value,
+        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+      double rowSqrtSum = 0;
+      for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+        double cellValue = ((DoubleEntry) e.getValue()).getValue();
+        rowSqrtSum += (cellValue * cellValue);
+      }
+
+      nValue.set(rowSqrtSum);
+      output.collect(nKey, nValue);
+    }
+  }
+
+  /** Frobenius Norm Combiner */
+  public static class MatrixFrobeniusNormCombiner extends MapReduceBase
+      implements MatrixNormReducer {
+    private double sqrtSum = 0;
+
+    @Override
+    public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+      while (values.hasNext()) {
+        sqrtSum += values.next().get();
+      }
+      // Note: Tricky here. As we known, we collect each row's sum with key(-1).
+      // the reduce will just iterate through one key (-1)
+      // so we collect the max sum-value here
+      nValue.set(sqrtSum);
+      output.collect(nKey, nValue);
+    }
+  }
+
+  /** Frobenius Norm Reducer */
+  public static class MatrixFrobeniusNormReducer extends MapReduceBase
+      implements MatrixNormReducer {
+    private double sqrtSum = 0;
+
+    @Override
+    public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+      while (values.hasNext()) {
+        sqrtSum += values.next().get();
+      }
+
+      // Note: Tricky here. As we known, we collect each row's sum with key(-1).
+      // the reduce will just iterate through one key (-1)
+      // so we collect the max sum-value here
+      nValue.set(Math.sqrt(sqrtSum));
+      output.collect(nKey, nValue);
+    }
+  }
+
+  // /
+  // / MaxValue Norm
+  // /
+
+  /** MaxValue Norm Mapper * */
+  public static class MatrixMaxValueNormMapper extends MapReduceBase implements
+      MatrixNormMapper {
+    @Override
+    public void map(IntWritable key, MapWritable value,
+        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+      double max = 0;
+      for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+        double cellValue = Math.abs(((DoubleEntry) e.getValue()).getValue());
+        max = cellValue > max ? cellValue : max;
+      }
+
+      nValue.set(max);
+      output.collect(nKey, nValue);
+    }
+
+  }
+
+  /** MaxValue Norm Reducer */
+  public static class MatrixMaxValueNormReducer extends MapReduceBase implements
+      MatrixNormReducer {
+    private double max = 0;
+
+    @Override
+    public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+      while (values.hasNext()) {
+        max = Math.max(values.next().get(), max);
+      }
+
+      // Note: Tricky here. As we known, we collect each row's sum with key(-1).
+      // the reduce will just iterate through one key (-1)
+      // so we collect the max sum-value here
+      nValue.set(max);
+      output.collect(nKey, nValue);
+    }
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormReduce.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormReduce.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,83 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+public class MatrixNormReduce extends MapReduceBase implements
+    Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+  private double max = 0;
+  private String outDir = "";
+  private JobConf conf;
+  private static final String OUTPUT = "hama.multiplication.matrix.a";
+
+  public void configure(JobConf job) {
+    outDir = job.get(OUTPUT, "");
+    conf = job;
+  }
+
+  public static void initJob(String path, Class<MatrixNormReduce> reducer,
+      JobConf jobConf) {
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    jobConf.setReducerClass(reducer);
+    jobConf.setOutputKeyClass(IntWritable.class);
+    jobConf.setOutputValueClass(DoubleWritable.class);
+    jobConf.set(OUTPUT, path);
+  }
+
+  @Override
+  public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+      OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+      throws IOException {
+
+    while (values.hasNext()) {
+      max = Math.max(values.next().get(), max);
+    }
+
+  }
+
+  /**
+   * Reduce task done, Writes the largest element of the passed array
+   */
+  @Override
+  public void close() throws IOException {
+    // write output to a file
+    Path outFile = new Path(outDir, "reduce-out");
+    FileSystem fileSys = FileSystem.get(conf);
+    SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+        outFile, IntWritable.class, DoubleWritable.class, CompressionType.NONE);
+    writer.append(new IntWritable(-1), new DoubleWritable(max));
+    writer.close();
+  }
+}



Mime
View raw message