Return-Path: Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: (qmail 28146 invoked from network); 10 Sep 2009 05:33:38 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 10 Sep 2009 05:33:38 -0000 Received: (qmail 86713 invoked by uid 500); 10 Sep 2009 05:33:38 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 86690 invoked by uid 500); 10 Sep 2009 05:33:38 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@incubator.apache.org Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 86680 invoked by uid 99); 10 Sep 2009 05:33:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Sep 2009 05:33:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Sep 2009 05:33:24 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 228A02388962; Thu, 10 Sep 2009 05:33:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r813233 [4/7] - in /incubator/hama/branches: ./ hama-0.19/ hama-0.19/bin/ hama-0.19/conf/ hama-0.19/lib/ hama-0.19/lib/findbugs/ hama-0.19/lib/findbugs/plugin/ hama-0.19/lib/jetty-ext/ hama-0.19/src/ hama-0.19/src/docs/ hama-0.19/src/docs/s... Date: Thu, 10 Sep 2009 05:32:59 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090910053302.228A02388962@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseMatrix.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseMatrix.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseMatrix.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,294 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hama.algebra.SparseMatrixVectorMultMap; +import org.apache.hama.algebra.SparseMatrixVectorMultReduce; +import org.apache.hama.io.VectorUpdate; +import org.apache.hama.mapred.RandomMatrixMap; +import org.apache.hama.mapred.RandomMatrixReduce; +import org.apache.hama.util.BytesUtil; +import org.apache.hama.util.JobManager; +import org.apache.hama.util.RandomVariable; + +public class SparseMatrix extends AbstractMatrix implements Matrix { + static private final String TABLE_PREFIX = SparseMatrix.class.getSimpleName(); + static private final Path TMP_DIR = new Path(SparseMatrix.class + .getSimpleName() + + "_TMP_dir"); + + public SparseMatrix(HamaConfiguration conf, int m, int n) throws IOException { + setConfiguration(conf); + + tryToCreateTable(TABLE_PREFIX); + closed = false; + this.setDimension(m, n); + } + + /** + * Load a matrix from an existed matrix table whose tablename is 'matrixpath' !! + * It is an internal used for map/reduce. + * + * @param conf configuration object + * @param matrixpath + * @throws IOException + * @throws IOException + */ + public SparseMatrix(HamaConfiguration conf, String matrixpath) + throws IOException { + setConfiguration(conf); + matrixPath = matrixpath; + // load the matrix + table = new HTable(conf, matrixPath); + // TODO: now we don't increment the reference of the table + // for it's an internal use for map/reduce. + // if we want to increment the reference of the table, + // we don't know where to call Matrix.close in Add & Mul map/reduce + // process to decrement the reference. It seems difficulty. + } + + /** + * Generate matrix with random elements + * + * @param conf configuration object + * @param m the number of rows. + * @param n the number of columns. + * @return an m-by-n matrix with uniformly distributed random elements. + * @throws IOException + */ + public static SparseMatrix random(HamaConfiguration conf, int m, int n) + throws IOException { + SparseMatrix rand = new SparseMatrix(conf, m, n); + SparseVector vector = new SparseVector(); + LOG.info("Create the " + m + " * " + n + " random matrix : " + + rand.getPath()); + + for (int i = 0; i < m; i++) { + vector.clear(); + for (int j = 0; j < n; j++) { + Random r = new Random(); + if(r.nextInt(2) != 0) + vector.set(j, RandomVariable.rand()); + } + rand.setRow(i, vector); + } + + return rand; + } + + public static SparseMatrix random_mapred(HamaConfiguration conf, int m, int n, double percent) throws IOException { + SparseMatrix rand = new SparseMatrix(conf, m, n); + LOG.info("Create the " + m + " * " + n + " random matrix : " + + rand.getPath()); + + JobConf jobConf = new JobConf(conf); + jobConf.setJobName("random matrix MR job : " + rand.getPath()); + + jobConf.setNumMapTasks(conf.getNumMapTasks()); + jobConf.setNumReduceTasks(conf.getNumReduceTasks()); + + final Path inDir = new Path(TMP_DIR, "in"); + FileInputFormat.setInputPaths(jobConf, inDir); + jobConf.setMapperClass(RandomMatrixMap.class); + jobConf.setMapOutputKeyClass(IntWritable.class); + jobConf.setMapOutputValueClass(MapWritable.class); + + RandomMatrixReduce.initJob(rand.getPath(), RandomMatrixReduce.class, + jobConf); + jobConf.setSpeculativeExecution(false); + jobConf.setInt("matrix.column", n); + jobConf.set("matrix.type", TABLE_PREFIX); + jobConf.set("matrix.density", String.valueOf(percent)); + + jobConf.setInputFormat(SequenceFileInputFormat.class); + final FileSystem fs = FileSystem.get(jobConf); + int interval = m / conf.getNumMapTasks(); + + // generate an input file for each map task + for (int i = 0; i < conf.getNumMapTasks(); ++i) { + final Path file = new Path(inDir, "part" + i); + final IntWritable start = new IntWritable(i * interval); + IntWritable end = null; + if ((i + 1) != conf.getNumMapTasks()) { + end = new IntWritable(((i * interval) + interval) - 1); + } else { + end = new IntWritable(m - 1); + } + final SequenceFile.Writer writer = SequenceFile.createWriter(fs, jobConf, + file, IntWritable.class, IntWritable.class, CompressionType.NONE); + try { + writer.append(start, end); + } finally { + writer.close(); + } + System.out.println("Wrote input for Map #" + i); + } + + JobClient.runJob(jobConf); + fs.delete(TMP_DIR, true); + return rand; + } + + @Override + public Matrix add(Matrix B) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Matrix add(double alpha, Matrix B) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public double get(int i, int j) throws IOException { + if(this.getRows() < i || this.getColumns() < j) + throw new ArrayIndexOutOfBoundsException(i +", "+ j); + + Cell c = table.get(BytesUtil.getRowIndex(i), BytesUtil.getColumnIndex(j)); + return (c != null) ? BytesUtil.bytesToDouble(c.getValue()) : 0.0; + } + + @Override + public Vector getColumn(int j) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /** + * Gets the vector of row + * + * @param i the row index of the matrix + * @return the vector of row + * @throws IOException + */ + public SparseVector getRow(int i) throws IOException { + return new SparseVector(table.getRow(BytesUtil.getRowIndex(i), new byte[][] { Bytes.toBytes(Constants.COLUMN) })); + } + + /** {@inheritDoc} */ + public void set(int i, int j, double value) throws IOException { + if(value != 0) { + VectorUpdate update = new VectorUpdate(i); + update.put(j, value); + table.commit(update.getBatchUpdate()); + } + } + + /** + * Returns type of matrix + */ + public String getType() { + return this.getClass().getSimpleName(); + } + + /** + * C = A*B using iterative method + * + * @param B + * @return C + * @throws IOException + */ + public SparseMatrix mult(Matrix B) throws IOException { + SparseMatrix result = new SparseMatrix(config, this.getRows(), this.getColumns()); + + for(int i = 0; i < this.getRows(); i++) { + JobConf jobConf = new JobConf(config); + jobConf.setJobName("multiplication MR job : " + result.getPath() + " " + i); + + jobConf.setNumMapTasks(config.getNumMapTasks()); + jobConf.setNumReduceTasks(config.getNumReduceTasks()); + + SparseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(), SparseMatrixVectorMultMap.class, + IntWritable.class, MapWritable.class, jobConf); + SparseMatrixVectorMultReduce.initJob(result.getPath(), SparseMatrixVectorMultReduce.class, + jobConf); + JobManager.execute(jobConf); + } + + return result; + } + + @Override + public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /** + * Computes the given norm of the matrix + * + * @param type + * @return norm of the matrix + * @throws IOException + */ + public double norm(Norm type) throws IOException { + if (type == Norm.One) + return getNorm1(); + else if (type == Norm.Frobenius) + return getFrobenius(); + else if (type == Norm.Infinity) + return getInfinity(); + else + return getMaxvalue(); + } + + @Override + public void setColumn(int column, Vector vector) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void setRow(int row, Vector vector) throws IOException { + if(this.getRows() < row) + throw new ArrayIndexOutOfBoundsException(row); + + if(vector.size() > 0) { // stores if size > 0 + VectorUpdate update = new VectorUpdate(row); + update.putAll(((SparseVector) vector).getEntries()); + table.commit(update.getBatchUpdate()); + } + } + + @Override + public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException { + // TODO Auto-generated method stub + return null; + } + +} \ No newline at end of file Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseVector.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseVector.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseVector.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseVector.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,185 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hama.io.DoubleEntry; +import org.apache.log4j.Logger; + +/** + * This class represents a sparse vector. + */ +public class SparseVector extends AbstractVector implements Vector { + static final Logger LOG = Logger.getLogger(SparseVector.class); + + public SparseVector() { + this(new MapWritable()); + } + + public SparseVector(MapWritable m) { + this.entries = m; + } + + public SparseVector(RowResult row) { + this.initMap(row); + } + + @Override + public Vector add(double alpha, Vector v) { + if (alpha == 0) + return this; + + for (Map.Entry e : v.getEntries().entrySet()) { + if (this.entries.containsKey(e.getKey())) { + // add + double value = alpha * ((DoubleEntry) e.getValue()).getValue() + + this.get(((IntWritable) e.getKey()).get()); + this.entries.put(e.getKey(), new DoubleEntry(value)); + } else { + // put + double value = alpha * ((DoubleEntry) e.getValue()).getValue(); + this.entries.put(e.getKey(), new DoubleEntry(value)); + } + } + + return this; + } + + /** + * x = v + x + * + * @param v2 + * @return x = v + x + */ + public SparseVector add(Vector v2) { + + for (Map.Entry e : v2.getEntries().entrySet()) { + int key = ((IntWritable) e.getKey()).get(); + if (this.entries.containsKey(e.getKey())) { + this.add(key, ((DoubleEntry) e.getValue()).getValue()); + } else { + this.set(key, ((DoubleEntry) e.getValue()).getValue()); + } + } + + return this; + } + + @Override + public double dot(Vector v) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public double norm(Norm type) { + // TODO Auto-generated method stub + return 0; + } + + /** + * v = alpha*v + * + * @param alpha + * @return v = alpha*v + */ + public SparseVector scale(double alpha) { + for (Map.Entry e : this.entries.entrySet()) { + this.entries.put(e.getKey(), new DoubleEntry(((DoubleEntry) e.getValue()) + .getValue() + * alpha)); + } + return this; + } + + /** + * Gets the value of index + * + * @param index + * @return the value of v(index) + * @throws IOException + */ + public double get(int index) { + double value; + try { + value = ((DoubleEntry) this.entries.get(new IntWritable(index))) + .getValue(); + } catch (NullPointerException e) { // returns zero if there is no value + return 0; + } + + return value; + } + + /** + * Sets the value of index + * + * @param index + * @param value + */ + public void set(int index, double value) { + // If entries are null, create new object + if (this.entries == null) { + this.entries = new MapWritable(); + } + + if (value != 0) // only stores non-zero element + this.entries.put(new IntWritable(index), new DoubleEntry(value)); + } + + /** + * Adds the value to v(index) + * + * @param index + * @param value + */ + public void add(int index, double value) { + set(index, get(index) + value); + } + + /** + * Sets the vector + * + * @param v + * @return x = v + */ + public SparseVector set(Vector v) { + return new SparseVector(v.getEntries()); + } + + @Override + public Vector subVector(int i0, int i1) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Vector set(double alpha, Vector v) { + // TODO Auto-generated method stub + return null; + } + +} Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SubMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SubMatrix.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SubMatrix.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SubMatrix.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,222 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hama.util.BytesUtil; +import org.apache.log4j.Logger; + +/** + * A sub matrix is a matrix formed by selecting certain rows and columns from a + * bigger matrix. This is a in-memory operation only. + */ +public class SubMatrix { + static final Logger LOG = Logger.getLogger(SubMatrix.class); + private double[][] matrix; + + /** + * Constructor + * + * @param i the size of rows + * @param j the size of columns + */ + public SubMatrix(int i, int j) { + this.matrix = new double[i][j]; + } + + /** + * Constructor + * + * @param c a two dimensional double array + */ + public SubMatrix(double[][] c) { + double[][] matrix = c; + this.matrix = matrix; + } + + public SubMatrix(byte[] matrix) throws IOException { + ByteArrayInputStream bos = new ByteArrayInputStream(matrix); + DataInputStream dis = new DataInputStream(bos); + + int rows = dis.readInt(); + int columns = dis.readInt(); + this.matrix = new double[rows][columns]; + + for(int i = 0; i < rows; i++) { + for(int j = 0; j < columns; j++) { + this.matrix[i][j] = dis.readDouble(); + } + } + + dis.close(); + bos.close(); + } + + /** + * Sets the value + * + * @param row + * @param column + * @param value + */ + public void set(int row, int column, double value) { + matrix[row][column] = value; + } + + /** + * Sets the value + * + * @param row + * @param column + * @param value + */ + public void set(int row, int column, byte[] value) { + matrix[row][column] = BytesUtil.bytesToDouble(value); + } + + /** + * Gets the value + * + * @param i + * @param j + * @return the value of submatrix(i, j) + */ + public double get(int i, int j) { + return matrix[i][j]; + } + + public void add(int row, int column, double value) { + matrix[row][column] = matrix[row][column] + value; + } + + /** + * c = a+b + * + * @param b + * @return c + */ + public SubMatrix add(SubMatrix b) { + SubMatrix c = new SubMatrix(this.getRows(), this.getColumns()); + + for (int i = 0; i < this.getRows(); i++) { + for (int j = 0; j < this.getColumns(); j++) { + c.set(i, j, (this.get(i, j) + b.get(i, j))); + } + } + + return c; + } + + /** + * c = a*b + * + * @param b + * @return c + */ + public SubMatrix mult(SubMatrix b) { + SubMatrix c = new SubMatrix(this.getRows(), b.getColumns()); + + for (int i = 0; i < this.getRows(); i++) { + for (int j = 0; j < b.getColumns(); j++) { + for (int k = 0; k < this.getColumns(); k++) { + c.add(i, j, this.get(i, k) * b.get(k, j)); + } + } + } + + return c; + } + + /** + * Gets the number of rows + * + * @return the number of rows + */ + public int getRows() { + return this.matrix.length; + } + + /** + * Gets the number of columns + * + * @return the number of columns + */ + public int getColumns() { + return this.matrix[0].length; + } + + /** + * Close + */ + public void close() { + matrix = null; + } + + /** + * @return the 2d double array + */ + public double[][] getDoubleArray() { + double[][] result = matrix; + return result; + } + + /** + * Gets the bytes of the sub matrix + * + * @return the bytes of the sub matrix + * @throws IOException + */ + public byte[] getBytes() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + dos.writeInt(this.getRows()); + dos.writeInt(this.getColumns()); + + for(int i = 0; i < this.getRows(); i++) { + for(int j = 0; j < this.getColumns(); j++) { + dos.writeDouble(this.get(i, j)); + } + } + + byte[] data = bos.toByteArray(); + dos.close(); + bos.close(); + return data; + } + + public String toString() { + StringBuilder result = new StringBuilder(); + for (int i = 0; i < this.getRows(); i++) { + for (int j = 0; j < this.getColumns(); j++) { + result.append(this.get(i, j)); + result.append('\t'); + } + result.append('\n'); + } + return result.toString(); + } +} + Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Vector.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Vector.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Vector.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Vector.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,161 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import java.util.Iterator; + +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Writable; + +/** + * Basic vector interface. + */ +public interface Vector { + + /** + * Size of the vector + * + * @return size of the vector + */ + public int size(); + + /** + * Gets the value of index + * + * @param index + * @return v(index) + */ + public double get(int index); + + /** + * Sets the value of index + * + * @param index + * @param value + */ + public void set(int index, double value); + + /** + * Sets the vector + * + * @param v + * @return x = v + */ + public Vector set(Vector v); + + /** + * x = alpha * v + * + * @param alpha + * @param v + * @return x = alpha * v + */ + public Vector set(double alpha, Vector v); + + /** + * Adds the value to v(index) + * + * @param index + * @param value + */ + public void add(int index, double value); + + /** + * x = alpha*v + x + * + * @param alpha + * @param v + * @return x = alpha*v + x + */ + public Vector add(double alpha, Vector v); + + /** + * x = v + x + * + * @param v + * @return x = v + x + */ + public Vector add(Vector v); + + /** + * x dot v + * + * @param v + * @return x dot v + */ + public double dot(Vector v); + + /** + * v = alpha*v + * + * @param alpha + * @return v = alpha*v + */ + public Vector scale(double alpha); + + /** + * Returns a sub-vector. + * + * @param i0 the index of the first element + * @param i1 the index of the last element + * @return v[i0:i1] + */ + public Vector subVector( int i0, int i1 ); + + /** + * Computes the given norm of the vector + * + * @param type + * @return norm of the vector + */ + public double norm(Norm type); + + /** + * Supported vector-norms. + */ + enum Norm { + + /** Sum of the absolute values of the entries */ + One, + + /** The root of sum of squares */ + Two, + + /** The robust norm of the vector */ + TwoRobust, + + /** Largest entry in absolute value */ + Infinity + } + + /** + * Returns an iterator + * + * @return iterator + */ + public Iterator iterator(); + + /** + * Returns the {@link org.apache.hadoop.io.MapWritable} + * + * @return the entries of vector + */ + public MapWritable getEntries(); +} Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyMap.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyMap.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyMap.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyMap.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,62 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; + +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.Constants; +import org.apache.hama.SubMatrix; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.BlockWritable; +import org.apache.hama.mapred.BlockInputFormat; +import org.apache.log4j.Logger; + +public class BlockMultiplyMap extends MapReduceBase implements + Mapper { + static final Logger LOG = Logger.getLogger(BlockMultiplyMap.class); + + public static void initJob(String matrix_a, + Class map, Class outputKeyClass, + Class outputValueClass, JobConf jobConf) { + + jobConf.setMapOutputValueClass(outputValueClass); + jobConf.setMapOutputKeyClass(outputKeyClass); + jobConf.setMapperClass(map); + + jobConf.setInputFormat(BlockInputFormat.class); + FileInputFormat.addInputPaths(jobConf, matrix_a); + + jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK); + } + + @Override + public void map(BlockID key, BlockWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + SubMatrix c = value.get(0).mult(value.get(1)); + output.collect(key, new BlockWritable(c)); + } +} Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,86 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.SubMatrix; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.BlockWritable; +import org.apache.hama.io.VectorUpdate; +import org.apache.hama.mapred.VectorOutputFormat; +import org.apache.log4j.Logger; + +public class BlockMultiplyReduce extends MapReduceBase implements + Reducer { + static final Logger LOG = Logger.getLogger(BlockMultiplyReduce.class); + + /** + * Use this before submitting a BlockCyclicMultiplyReduce job. It will + * appropriately set up the JobConf. + * + * @param table + * @param reducer + * @param job + */ + public static void initJob(String table, + Class reducer, JobConf job) { + job.setOutputFormat(VectorOutputFormat.class); + job.setReducerClass(reducer); + job.set(VectorOutputFormat.OUTPUT_TABLE, table); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(BatchUpdate.class); + } + + @Override + public void reduce(BlockID key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + + SubMatrix s = null; + while (values.hasNext()) { + SubMatrix b = values.next().getMatrices().next(); + if (s == null) { + s = b; + } else { + s = s.add(b); + } + } + + int startRow = key.getRow() * s.getRows(); + int startColumn = key.getColumn() * s.getColumns(); + + for (int i = 0; i < s.getRows(); i++) { + VectorUpdate update = new VectorUpdate(i + startRow); + for (int j = 0; j < s.getColumns(); j++) { + update.put(j + startColumn, s.get(i, j)); + } + output.collect(new IntWritable(key.getRow()), update); + } + } +} Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,85 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.Constants; +import org.apache.hama.DenseMatrix; +import org.apache.hama.DenseVector; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.mapred.VectorInputFormat; +import org.apache.log4j.Logger; + +public class DenseMatrixVectorMultMap extends MapReduceBase implements + Mapper { + static final Logger LOG = Logger.getLogger(DenseMatrixVectorMultMap.class); + protected DenseVector currVector; + public static final String ITH_ROW = "ith.row"; + public static final String MATRIX_A = "hama.multiplication.matrix.a"; + public static final String MATRIX_B = "hama.multiplication.matrix.b"; + private IntWritable nKey = new IntWritable(); + + public void configure(JobConf job) { + DenseMatrix matrix_a; + try { + matrix_a = new DenseMatrix(new HamaConfiguration(job), job.get(MATRIX_A, "")); + int ithRow = job.getInt(ITH_ROW, 0); + nKey.set(ithRow); + currVector = matrix_a.getRow(ithRow); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static void initJob(int i, String matrix_a, String matrix_b, + Class map, Class outputKeyClass, + Class outputValueClass, JobConf jobConf) { + + jobConf.setMapOutputValueClass(outputValueClass); + jobConf.setMapOutputKeyClass(outputKeyClass); + jobConf.setMapperClass(map); + jobConf.setInt(ITH_ROW, i); + jobConf.set(MATRIX_A, matrix_a); + jobConf.set(MATRIX_B, matrix_b); + + jobConf.setInputFormat(VectorInputFormat.class); + FileInputFormat.addInputPaths(jobConf, matrix_b); + jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN); + } + + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + + DenseVector scaled = new DenseVector(value).scale(currVector.get(key.get())); + output.collect(nKey, scaled.getEntries()); + + } +} \ No newline at end of file Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,81 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.DenseVector; +import org.apache.hama.io.VectorUpdate; +import org.apache.hama.mapred.VectorOutputFormat; +import org.apache.log4j.Logger; + +public class DenseMatrixVectorMultReduce extends MapReduceBase implements + Reducer { + static final Logger LOG = Logger.getLogger(DenseMatrixVectorMultReduce.class); + + /** + * Use this before submitting a TableReduce job. It will appropriately set up + * the JobConf. + * + * @param table + * @param reducer + * @param job + */ + public static void initJob(String table, + Class reducer, JobConf job) { + job.setOutputFormat(VectorOutputFormat.class); + job.setReducerClass(reducer); + job.set(VectorOutputFormat.OUTPUT_TABLE, table); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(BatchUpdate.class); + } + + @Override + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + DenseVector sum = new DenseVector(); + + while (values.hasNext()) { + DenseVector nVector = new DenseVector(values.next()); + if(sum.size() == 0) { + sum.zeroFill(nVector.size()); + sum.add(nVector); + } else { + sum.add(nVector); + } + } + + VectorUpdate update = new VectorUpdate(key.get()); + update.putAll(sum.getEntries()); + + output.collect(key, update); + } + +} Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/JacobiEigenValue.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/JacobiEigenValue.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/JacobiEigenValue.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/JacobiEigenValue.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,583 @@ +package org.apache.hama.algebra; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.mapred.TableSplit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.Constants; +import org.apache.hama.io.DoubleEntry; +import org.apache.hama.io.Pair; +import org.apache.hama.io.VectorUpdate; +import org.apache.hama.mapred.HTableInputFormatBase; +import org.apache.hama.mapred.HTableRecordReaderBase; +import org.apache.hama.util.BytesUtil; + +/** + * A catalog class collect all the m/r classes to compute the matrix's eigen + * values + */ +public class JacobiEigenValue { + + /** a matrix copy of the original copy collected in "eicol" family * */ + public static final String EICOL = "eicol:"; + /** a column family collect all values and statuses used during computation * */ + public static final String EI = "eival:"; + /** a column collect all the eigen values * */ + public static final String EIVAL = EI + "value"; + /** a column identify whether the eigen values have been changed * */ + public static final String EICHANGED = EI + "changed"; + /** a column identify the index of the max absolute value each row * */ + public static final String EIIND = EI + "ind"; + /** a matrix collect all the eigen vectors * */ + public static final String EIVEC = "eivec:"; + public static final String MATRIX = "hama.jacobieigenvalue.matrix"; + /** parameters for pivot * */ + public static final String PIVOTROW = "hama.jacobi.pivot.row"; + public static final String PIVOTCOL = "hama.jacobi.pivot.col"; + public static final String PIVOTSIN = "hama.jacobi.pivot.sin"; + public static final String PIVOTCOS = "hama.jacobi.pivot.cos"; + + static final Log LOG = LogFactory.getLog(JacobiEigenValue.class); + + /** + * The matrix will be modified during computing eigen value. So a new matrix + * will be created to prevent the original matrix being modified. To reduce + * the network transfer, we copy the "column" family in the original matrix to + * a "eicol" family. All the following modification will be done over "eicol" + * family. + * + * And the output Eigen Vector Arrays "eivec", and the output eigen value + * array "eival:value", and the temp status array "eival:changed", "eival:ind" + * will be created. + * + * Also "eival:state" will record the state of the rotation state of a matrix + */ + public static class InitMapper extends MapReduceBase implements + Mapper { + + HTable table; + + @Override + public void configure(JobConf job) { + String tableName = job.get(MATRIX, ""); + try { + table = new HTable(new HBaseConfiguration(job), tableName); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector collector, Reporter reporter) + throws IOException { + int row, col; + row = key.get(); + VectorUpdate vu = new VectorUpdate(row); + + double val; + double maxVal = Double.MIN_VALUE; + int maxInd = row + 1; + + boolean init = true; + for (Map.Entry e : value.entrySet()) { + val = ((DoubleEntry) e.getValue()).getValue(); + col = ((IntWritable) e.getKey()).get(); + // copy the original matrix to "EICOL" family + vu.put(JacobiEigenValue.EICOL, col, val); + // make the "EIVEC" a dialog matrix + vu.put(JacobiEigenValue.EIVEC, col, col == row ? 1 : 0); + if (col == row) { + vu.put(JacobiEigenValue.EIVAL, val); + } + // find the max index + if (col > row) { + if (init) { + maxInd = col; + maxVal = val; + init = false; + } else { + if (Math.abs(val) > Math.abs(maxVal)) { + maxVal = val; + maxInd = col; + } + } + } + } + // index array + vu.put(JacobiEigenValue.EIIND, maxInd); + // Changed Array set to be true during initialization + vu.put(JacobiEigenValue.EICHANGED, 1); + + table.commit(vu.getBatchUpdate()); + } + + } + + /** + * PivotInputFormat & PivotMapper & PivotReducer are used to find the pivot in + * a matrix + */ + public static class PivotInputFormat extends HTableInputFormatBase implements + InputFormat, JobConfigurable { + + private PivotRecordReader tableRecordReader; + + protected static class PivotRecordReader extends HTableRecordReaderBase + implements RecordReader { + + private int totalRows; + private int processedRows; + private int size; + boolean mocked = true; + + @Override + public void init() throws IOException { + super.init(); + + Cell rows = null; + rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS); + size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0; + + if (endRow.length == 0) { // the last split, we don't know the end row + totalRows = 0; // so we just skip it. + } else { + if (startRow.length == 0) { // the first split, start row is 0 + totalRows = BytesUtil.bytesToInt(endRow); + } else { + totalRows = BytesUtil.bytesToInt(endRow) + - BytesUtil.bytesToInt(startRow); + } + } + processedRows = 0; + LOG.info("Split (" + Bytes.toString(startRow) + ", " + + Bytes.toString(endRow) + ") -> " + totalRows); + } + + /** + * @return Pair + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public Pair createKey() { + return new Pair(); + } + + /** + * @return DoubleWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public DoubleWritable createValue() { + return new DoubleWritable(); + } + + /** + * @param key Pair as input key. + * @param value DoubleWritable as input value + * + * Converts Scanner.next() to Pair, DoubleWritable + * + * @return true if there was more data + * @throws IOException + */ + public boolean next(Pair key, DoubleWritable value) throws IOException { + RowResult result; + try { + result = this.scanner.next(); + } catch (UnknownScannerException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + restart(lastRow); + this.scanner.next(); // skip presumed already mapped row + result = this.scanner.next(); + } + + boolean hasMore = result != null && result.size() > 0; + if (hasMore) { + byte[] row = result.getRow(); + int rowId = BytesUtil.bytesToInt(row); + if (rowId == size - 1) { // skip the last row + if (mocked) { + key.set(Integer.MAX_VALUE, Integer.MAX_VALUE); + mocked = false; + return true; + } else { + return false; + } + } + + byte[] col = result.get(EIIND).getValue(); + int colId = BytesUtil.bytesToInt(col); + double val = 0; + + // get (rowId, colId)'s value + Cell cell = htable.get(BytesUtil.getRowIndex(rowId), Bytes + .toBytes(EICOL + colId)); + if (cell != null && cell.getValue() != null) { + val = BytesUtil.bytesToDouble(cell.getValue()); + } + + key.set(rowId, colId); + value.set(val); + + lastRow = row; + processedRows++; + } else { + if (mocked) { + key.set(Integer.MAX_VALUE, Integer.MAX_VALUE); + mocked = false; + return true; + } else { + return false; + } + } + return hasMore; + } + + @Override + public float getProgress() { + if (totalRows <= 0) { + return 0; + } else { + return Math.min(1.0f, processedRows / (float) totalRows); + } + } + + } + + @Override + public RecordReader getRecordReader(InputSplit split, + JobConf conf, Reporter reporter) throws IOException { + TableSplit tSplit = (TableSplit) split; + PivotRecordReader trr = this.tableRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new PivotRecordReader(); + } + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + trr.init(); + return trr; + } + + protected void setTableRecordReader(PivotRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } + + } + + // find the pivot of the matrix + public static class PivotMapper extends MapReduceBase implements + Mapper { + + private double max = 0; + private Pair pair = new Pair(0, 0); + private Pair dummyPair = new Pair(Integer.MAX_VALUE, Integer.MAX_VALUE); + private DoubleWritable dummyVal = new DoubleWritable(0.0); + + @Override + public void map(Pair key, DoubleWritable value, + OutputCollector collector, Reporter reporter) + throws IOException { + if (key.getRow() != Integer.MAX_VALUE) { + if (Math.abs(value.get()) > Math.abs(max)) { + pair.set(key.getRow(), key.getColumn()); + max = value.get(); + } + } else { + collector.collect(pair, new DoubleWritable(max)); + collector.collect(dummyPair, dummyVal); + } + } + + } + + public static class PivotReducer extends MapReduceBase implements + Reducer { + + private double max = 0; + private Pair pair = new Pair(0, 0); + + @Override + public void reduce(Pair key, Iterator values, + OutputCollector collector, Reporter reporter) + throws IOException { + double val; + if (key.getRow() != Integer.MAX_VALUE) { + val = values.next().get(); + if (Math.abs(val) > Math.abs(max)) { + pair.set(key.getRow(), key.getColumn()); + max = val; + } + } else { + collector.collect(pair, new DoubleWritable(max)); + } + } + + } + + /** + * Tricky here! we rotation the matrix during we scan the matrix and update to + * the matrix so we just need a rotationrecordreader to scan the matrix and do + * the rotation the mapper&reducer just a dummy mapper + */ + public static class RotationInputFormat extends HTableInputFormatBase + implements InputFormat, JobConfigurable { + + private RotationRecordReader tableRecordReader; + + int pivot_row, pivot_col; + double pivot_cos, pivot_sin; + + public void configure(JobConf job) { + super.configure(job); + pivot_row = job.getInt(PIVOTROW, -1); + pivot_col = job.getInt(PIVOTCOL, -1); + pivot_sin = Double.parseDouble(job.get(PIVOTSIN)); + pivot_cos = Double.parseDouble(job.get(PIVOTCOS)); + } + + protected static class RotationRecordReader extends HTableRecordReaderBase + implements RecordReader { + + private int totalRows; + private int processedRows; + int startRowId, endRowId = -1; + int size; + + int pivotrow, pivotcol; + byte[] prow, pcol; + double pivotcos, pivotsin; + + public RotationRecordReader(int pr, int pc, double psin, double pcos) { + super(); + pivotrow = pr; + pivotcol = pc; + pivotsin = psin; + pivotcos = pcos; + prow = Bytes.toBytes(pivotrow); + pcol = Bytes.toBytes(pivotcol); + LOG.info(prow); + LOG.info(pcol); + } + + @Override + public void init() throws IOException { + super.init(); + + Cell rows = null; + rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS); + size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0; + + if (endRow.length == 0) { // the last split, we don't know the end row + totalRows = 0; // so we just skip it. + if (startRow.length == 0) + startRowId = 0; + else + startRowId = BytesUtil.bytesToInt(startRow); + endRowId = -1; + } else { + if (startRow.length == 0) { // the first split, start row is 0 + totalRows = BytesUtil.bytesToInt(endRow); + startRowId = 0; + endRowId = totalRows; + } else { + startRowId = BytesUtil.bytesToInt(startRow); + endRowId = BytesUtil.bytesToInt(endRow); + totalRows = startRowId - endRowId; + } + } + processedRows = 0; + LOG + .info("Split (" + startRowId + ", " + endRowId + ") -> " + + totalRows); + } + + /** + * @return NullWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public NullWritable createKey() { + return NullWritable.get(); + } + + /** + * @return NullWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public NullWritable createValue() { + return NullWritable.get(); + } + + /** + * @param key NullWritable as input key. + * @param value NullWritable as input value + * + * Converts Scanner.next() to NullWritable, NullWritable + * + * @return true if there was more data + * @throws IOException + */ + public boolean next(NullWritable key, NullWritable value) + throws IOException { + RowResult result; + try { + result = this.scanner.next(); + } catch (UnknownScannerException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + restart(lastRow); + this.scanner.next(); // skip presumed already mapped row + result = this.scanner.next(); + } + + double s1, s2; + VectorUpdate bu; + boolean hasMore = result != null && result.size() > 0; + if (hasMore) { + byte[] row = result.getRow(); + int rowId = BytesUtil.bytesToInt(row); + if (rowId < pivotrow) { + s1 = BytesUtil.bytesToDouble(htable.get( + BytesUtil.getRowIndex(rowId), + Bytes.toBytes(JacobiEigenValue.EICOL + pivotrow)).getValue()); + s2 = BytesUtil.bytesToDouble(htable.get( + BytesUtil.getRowIndex(rowId), + Bytes.toBytes(JacobiEigenValue.EICOL + pivotcol)).getValue()); + + bu = new VectorUpdate(rowId); + bu.put(EICOL, pivotrow, pivotcos * s1 - pivotsin * s2); + bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2); + + htable.commit(bu.getBatchUpdate()); + } else if (rowId == pivotrow) { + return true; + } else if (rowId < pivotcol) { + s1 = BytesUtil.bytesToDouble(htable.get( + BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + rowId)) + .getValue()); + s2 = BytesUtil.bytesToDouble(htable.get( + BytesUtil.getRowIndex(rowId), Bytes.toBytes(EICOL + pivotcol)) + .getValue()); + + bu = new VectorUpdate(rowId); + bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2); + htable.commit(bu.getBatchUpdate()); + + bu = new VectorUpdate(pivotrow); + bu.put(EICOL, rowId, pivotcos * s1 - pivotsin * s2); + htable.commit(bu.getBatchUpdate()); + } else if (rowId == pivotcol) { + for (int i = pivotcol + 1; i < size; i++) { + s1 = BytesUtil.bytesToDouble(htable.get( + BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + i)) + .getValue()); + s2 = BytesUtil.bytesToDouble(htable.get( + BytesUtil.getRowIndex(pivotcol), Bytes.toBytes(EICOL + i)) + .getValue()); + + bu = new VectorUpdate(pivotcol); + bu.put(EICOL, i, pivotsin * s1 + pivotcos * s2); + htable.commit(bu.getBatchUpdate()); + + bu = new VectorUpdate(pivotrow); + bu.put(EICOL, i, pivotcos * s1 - pivotsin * s2); + htable.commit(bu.getBatchUpdate()); + } + } else { // rowId > pivotcol + return false; + } + + lastRow = row; + processedRows++; + } + return hasMore; + } + + @Override + public float getProgress() { + if (totalRows <= 0) { + return 0; + } else { + return Math.min(1.0f, processedRows / (float) totalRows); + } + } + + } + + public InputSplit[] getSplits(JobConf job, int numSplits) + throws IOException { + InputSplit[] splits = super.getSplits(job, numSplits); + List newSplits = new ArrayList(); + for (InputSplit split : splits) { + TableSplit ts = (TableSplit) split; + byte[] row = ts.getStartRow(); + if (row.length == 0) // the first split + newSplits.add(split); + else { + if (BytesUtil.bytesToInt(ts.getStartRow()) < pivot_col) { + newSplits.add(split); + } + } + } + + return newSplits.toArray(new InputSplit[newSplits.size()]); + } + + @Override + public RecordReader getRecordReader( + InputSplit split, JobConf conf, Reporter reporter) throws IOException { + TableSplit tSplit = (TableSplit) split; + RotationRecordReader trr = this.tableRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new RotationRecordReader(pivot_row, pivot_col, pivot_sin, + pivot_cos); + } + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + trr.init(); + return trr; + } + + protected void setTableRecordReader(RotationRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } + + } +} Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMap.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMap.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMap.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMap.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,70 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.Constants; +import org.apache.hama.io.DoubleEntry; +import org.apache.hama.mapred.VectorInputFormat; + +public class MatrixNormMap extends MapReduceBase implements + Mapper { + private IntWritable nKey = new IntWritable(-1); + private DoubleWritable nValue = new DoubleWritable(); + + public static void initJob(String path, Class map, + Class outputKeyClass, Class outputValueClass, + JobConf jobConf) { + jobConf.setMapOutputValueClass(outputValueClass); + jobConf.setMapOutputKeyClass(outputKeyClass); + jobConf.setMapperClass(map); + + jobConf.setInputFormat(VectorInputFormat.class); + FileInputFormat.addInputPaths(jobConf, path); + jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN); + } + + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + + double rowSum = 0; + for(Map.Entry e : value.entrySet()) { + rowSum += Math.abs(((DoubleEntry) e.getValue()).getValue()); + } + nValue.set(rowSum); + + output.collect(nKey, nValue); + } + +} Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMapRed.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMapRed.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMapRed.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMapRed.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hama.Constants; +import org.apache.hama.io.DoubleEntry; +import org.apache.hama.mapred.VectorInputFormat; + +/** A Catalog class collect all the mr classes to compute the matrix's norm */ +public class MatrixNormMapRed { + + /** + * Initialize the job to compute the matrix's norm + * + * @param inputMatrixPath the input matrix's path + * @param outputPath the output file's name that records the norm of the + * matrix + * @param mapper Mapper + * @param combiner Combiner + * @param reducer Reducer + * @param jobConf Configuration of the job + */ + public static void initJob(String inputMatrixPath, String outputPath, + Class mapper, + Class combiner, + Class reducer, JobConf jobConf) { + jobConf.setMapperClass(mapper); + jobConf.setMapOutputKeyClass(IntWritable.class); + jobConf.setMapOutputValueClass(DoubleWritable.class); + jobConf.setCombinerClass(combiner); + jobConf.setReducerClass(reducer); + jobConf.setOutputKeyClass(IntWritable.class); + jobConf.setOutputValueClass(DoubleWritable.class); + + // input + jobConf.setInputFormat(VectorInputFormat.class); + jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN); + FileInputFormat.addInputPaths(jobConf, inputMatrixPath); + // output + jobConf.setOutputFormat(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(jobConf, new Path(outputPath)); + } + + /** the interface of norm mapper */ + public static interface MatrixNormMapper extends + Mapper { + IntWritable nKey = new IntWritable(-1); + DoubleWritable nValue = new DoubleWritable(0); + } + + /** the interface of norm reducer/combiner */ + public static interface MatrixNormReducer extends + Reducer { + IntWritable nKey = new IntWritable(-1); + DoubleWritable nValue = new DoubleWritable(0); + } + + // / + // / Infinity Norm + // / + + /** Infinity Norm */ + public static class MatrixInfinityNormMapper extends MapReduceBase implements + MatrixNormMapper { + + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + + double rowSum = 0; + for (Map.Entry e : value.entrySet()) { + rowSum += Math.abs(((DoubleEntry) e.getValue()).getValue()); + } + nValue.set(rowSum); + + output.collect(nKey, nValue); + } + + } + + /** + * Matrix Infinity Norm Reducer + */ + public static class MatrixInfinityNormReducer extends MapReduceBase implements + MatrixNormReducer { + + private double max = 0; + + @Override + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + + while (values.hasNext()) { + max = Math.max(values.next().get(), max); + } + + // Note: Tricky here. As we known, we collect each row's sum with key(-1). + // the reduce will just iterate through one key (-1) + // so we collect the max sum-value here + nValue.set(max); + output.collect(nKey, nValue); + } + + } + + // / + // / One Norm + // / + + /** One Norm Mapper */ + public static class MatrixOneNormMapper extends MapReduceBase implements + MatrixNormMapper { + + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + + for (Map.Entry e : value.entrySet()) { + nValue.set(((DoubleEntry) e.getValue()).getValue()); + output.collect((IntWritable) e.getKey(), nValue); + } + } + } + + /** One Norm Combiner * */ + public static class MatrixOneNormCombiner extends MapReduceBase implements + MatrixNormReducer { + + @Override + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + + double partialColSum = 0; + while (values.hasNext()) { + partialColSum += values.next().get(); + } + nValue.set(partialColSum); + output.collect(key, nValue); + } + } + + /** One Norm Reducer * */ + public static class MatrixOneNormReducer extends MapReduceBase implements + MatrixNormReducer { + private double max = 0; + private Path outDir; + private JobConf conf; + + @Override + public void configure(JobConf job) { + outDir = FileOutputFormat.getOutputPath(job); + conf = job; + } + + @Override + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + double colSum = 0; + while (values.hasNext()) { + colSum += values.next().get(); + } + + max = Math.max(Math.abs(colSum), max); + } + + @Override + public void close() throws IOException { + // write output to a file + Path outFile = new Path(outDir, "reduce-out"); + FileSystem fileSys = FileSystem.get(conf); + SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, + outFile, IntWritable.class, DoubleWritable.class, + CompressionType.NONE); + writer.append(new IntWritable(-1), new DoubleWritable(max)); + writer.close(); + } + } + + // / + // / Frobenius Norm + // / + + /** Frobenius Norm Mapper */ + public static class MatrixFrobeniusNormMapper extends MapReduceBase implements + MatrixNormMapper { + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + double rowSqrtSum = 0; + for (Map.Entry e : value.entrySet()) { + double cellValue = ((DoubleEntry) e.getValue()).getValue(); + rowSqrtSum += (cellValue * cellValue); + } + + nValue.set(rowSqrtSum); + output.collect(nKey, nValue); + } + } + + /** Frobenius Norm Combiner */ + public static class MatrixFrobeniusNormCombiner extends MapReduceBase + implements MatrixNormReducer { + private double sqrtSum = 0; + + @Override + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + while (values.hasNext()) { + sqrtSum += values.next().get(); + } + // Note: Tricky here. As we known, we collect each row's sum with key(-1). + // the reduce will just iterate through one key (-1) + // so we collect the max sum-value here + nValue.set(sqrtSum); + output.collect(nKey, nValue); + } + } + + /** Frobenius Norm Reducer */ + public static class MatrixFrobeniusNormReducer extends MapReduceBase + implements MatrixNormReducer { + private double sqrtSum = 0; + + @Override + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + while (values.hasNext()) { + sqrtSum += values.next().get(); + } + + // Note: Tricky here. As we known, we collect each row's sum with key(-1). + // the reduce will just iterate through one key (-1) + // so we collect the max sum-value here + nValue.set(Math.sqrt(sqrtSum)); + output.collect(nKey, nValue); + } + } + + // / + // / MaxValue Norm + // / + + /** MaxValue Norm Mapper * */ + public static class MatrixMaxValueNormMapper extends MapReduceBase implements + MatrixNormMapper { + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + double max = 0; + for (Map.Entry e : value.entrySet()) { + double cellValue = Math.abs(((DoubleEntry) e.getValue()).getValue()); + max = cellValue > max ? cellValue : max; + } + + nValue.set(max); + output.collect(nKey, nValue); + } + + } + + /** MaxValue Norm Reducer */ + public static class MatrixMaxValueNormReducer extends MapReduceBase implements + MatrixNormReducer { + private double max = 0; + + @Override + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + while (values.hasNext()) { + max = Math.max(values.next().get(), max); + } + + // Note: Tricky here. As we known, we collect each row's sum with key(-1). + // the reduce will just iterate through one key (-1) + // so we collect the max sum-value here + nValue.set(max); + output.collect(nKey, nValue); + } + } +} Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormReduce.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormReduce.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormReduce.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormReduce.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,83 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; + +public class MatrixNormReduce extends MapReduceBase implements + Reducer { + private double max = 0; + private String outDir = ""; + private JobConf conf; + private static final String OUTPUT = "hama.multiplication.matrix.a"; + + public void configure(JobConf job) { + outDir = job.get(OUTPUT, ""); + conf = job; + } + + public static void initJob(String path, Class reducer, + JobConf jobConf) { + jobConf.setOutputFormat(SequenceFileOutputFormat.class); + jobConf.setReducerClass(reducer); + jobConf.setOutputKeyClass(IntWritable.class); + jobConf.setOutputValueClass(DoubleWritable.class); + jobConf.set(OUTPUT, path); + } + + @Override + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + + while (values.hasNext()) { + max = Math.max(values.next().get(), max); + } + + } + + /** + * Reduce task done, Writes the largest element of the passed array + */ + @Override + public void close() throws IOException { + // write output to a file + Path outFile = new Path(outDir, "reduce-out"); + FileSystem fileSys = FileSystem.get(conf); + SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, + outFile, IntWritable.class, DoubleWritable.class, CompressionType.NONE); + writer.append(new IntWritable(-1), new DoubleWritable(max)); + writer.close(); + } +} Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java?rev=813233&view=auto ============================================================================== --- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java (added) +++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java Thu Sep 10 05:32:52 2009 @@ -0,0 +1,94 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.Constants; +import org.apache.hama.DenseMatrix; +import org.apache.hama.DenseVector; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.mapred.VectorInputFormat; +import org.apache.log4j.Logger; + +public class RowCyclicAdditionMap extends MapReduceBase implements +Mapper { + static final Logger LOG = Logger.getLogger(RowCyclicAdditionMap.class); + protected DenseMatrix[] matrix_summands; + protected double[] matrix_alphas; + public static final String MATRIX_SUMMANDS = "hama.addition.summands"; + public static final String MATRIX_ALPHAS = "hama.addition.alphas"; + + public void configure(JobConf job) { + try { + String[] matrix_names = job.get(MATRIX_SUMMANDS, "").split(","); + String[] matrix_alpha_strs = job.get(MATRIX_ALPHAS, "").split(","); + assert(matrix_names.length == matrix_alpha_strs.length && matrix_names.length >= 1); + + matrix_summands = new DenseMatrix[matrix_names.length]; + matrix_alphas = new double[matrix_names.length]; + for(int i=0; i map, + Class outputKeyClass, Class outputValueClass, + JobConf jobConf) { + + jobConf.setMapOutputValueClass(outputValueClass); + jobConf.setMapOutputKeyClass(outputKeyClass); + jobConf.setMapperClass(map); + jobConf.set(MATRIX_SUMMANDS, matrix_summandlist); + jobConf.set(MATRIX_ALPHAS, matrix_alphalist); + + jobConf.setInputFormat(VectorInputFormat.class); + FileInputFormat.addInputPaths(jobConf, matrix_a); + jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN); + } + + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + + DenseVector result = new DenseVector(value); + DenseVector summand; + for(int i=0; i { + + /** + * Use this before submitting a TableReduce job. It will appropriately set up + * the JobConf. + * + * @param table + * @param reducer + * @param job + */ + public static void initJob(String table, Class reducer, + JobConf job) { + job.setOutputFormat(VectorOutputFormat.class); + job.setReducerClass(reducer); + job.set(VectorOutputFormat.OUTPUT_TABLE, table); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(BatchUpdate.class); + } + + @Override + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + + VectorUpdate update = new VectorUpdate(key.get()); + update.putAll(values.next()); + + output.collect(key, update); + } + +}