Return-Path: Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: (qmail 84060 invoked from network); 10 Mar 2009 00:37:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Mar 2009 00:37:12 -0000 Received: (qmail 17106 invoked by uid 500); 10 Mar 2009 00:37:12 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 17081 invoked by uid 500); 10 Mar 2009 00:37:11 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@ Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 17070 invoked by uid 99); 10 Mar 2009 00:37:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Mar 2009 17:37:11 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Mar 2009 00:37:04 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6C39123888F4; Tue, 10 Mar 2009 00:36:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r751922 - in /incubator/hama/trunk: CHANGES.txt src/java/org/apache/hama/DenseMatrix.java src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java src/test/org/apache/hama/TestDenseMatrix.java Date: Tue, 10 Mar 2009 00:36:44 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090310003644.6C39123888F4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: edwardyoon Date: Tue Mar 10 00:36:43 2009 New Revision: 751922 URL: http://svn.apache.org/viewvc?rev=751922&view=rev Log: Combine multi-mapreduce jobs into a single mapreduce job while computing such as 'C=alpha*B + A' Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=751922&r1=751921&r2=751922&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Tue Mar 10 00:36:43 2009 @@ -36,6 +36,8 @@ IMPROVEMENTS + HAMA-154: Combine multi-mapreduce jobs into a single mapreduce job + while computing (samuel via edwardyoon) HAMA-152: Replace MapWritable to hadoop.io.MapWritable (edwardyoon) HAMA-109: Implement add(double alpha, Matrix B) (edwardyoon) HAMA-150: Refactor blockingMapRed (edwardyoon) Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=751922&r1=751921&r2=751922&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Tue Mar 10 00:36:43 2009 @@ -96,7 +96,7 @@ // if force is set to true: // 1) if this matrixName has aliase to other matrix, we will remove // the old aliase, create a new matrix table, and aliase to it. - + // 2) if this matrixName has no aliase to other matrix, we will create // a new matrix table, and alise to it. // @@ -306,13 +306,13 @@ * @throws IOException */ public double get(int i, int j) throws IOException { - if(this.getRows() < i || this.getColumns() < j) - throw new ArrayIndexOutOfBoundsException(i +", "+ j); - + if (this.getRows() < i || this.getColumns() < j) + throw new ArrayIndexOutOfBoundsException(i + ", " + j); + Cell c = table.get(BytesUtil.getRowIndex(i), BytesUtil.getColumnIndex(j)); - if(c == null) + if (c == null) throw new NullPointerException("Unexpected null"); - + return BytesUtil.bytesToDouble(c.getValue()); } @@ -326,7 +326,7 @@ public DenseVector getRow(int i) throws IOException { return new DenseVector(table.getRow(BytesUtil.getRowIndex(i))); } - + /** * Gets the vector of column * @@ -342,8 +342,8 @@ MapWritable trunk = new MapWritable(); for (RowResult row : scan) { - trunk.put(new IntWritable(BytesUtil.bytesToInt(row.getRow())), new DoubleEntry(row - .get(columnKey))); + trunk.put(new IntWritable(BytesUtil.bytesToInt(row.getRow())), + new DoubleEntry(row.get(columnKey))); } return new DenseVector(trunk); @@ -355,7 +355,7 @@ update.put(j, value); table.commit(update.getBatchUpdate()); } - + /** * Set the row of a matrix to a given vector * @@ -364,9 +364,9 @@ * @throws IOException */ public void setRow(int row, Vector vector) throws IOException { - if(this.getRows() < row) + if (this.getRows() < row) increaseRows(); - + VectorUpdate update = new VectorUpdate(row); update.putAll(((DenseVector) vector).getEntries()); table.commit(update.getBatchUpdate()); @@ -382,7 +382,7 @@ public void setColumn(int column, Vector vector) throws IOException { if (this.getColumns() < column) increaseColumns(); - + for (int i = 0; i < vector.size(); i++) { VectorUpdate update = new VectorUpdate(i); update.put(column, vector.get(i)); @@ -391,15 +391,16 @@ } /** - * C = B + A + * C = alpha*B + A * + * @param alpha * @param B * @return C * @throws IOException */ - public DenseMatrix add(Matrix B) throws IOException { + public DenseMatrix add(double alpha, Matrix B) throws IOException { ensureForAddition(B); - + DenseMatrix result = new DenseMatrix(config); JobConf jobConf = new JobConf(config); jobConf.setJobName("addition MR job" + result.getPath()); @@ -407,9 +408,9 @@ jobConf.setNumMapTasks(config.getNumMapTasks()); jobConf.setNumReduceTasks(config.getNumReduceTasks()); - RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(), - RowCyclicAdditionMap.class, IntWritable.class, MapWritable.class, - jobConf); + RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(), Double + .toString(alpha), RowCyclicAdditionMap.class, IntWritable.class, + MapWritable.class, jobConf); RowCyclicAdditionReduce.initJob(result.getPath(), RowCyclicAdditionReduce.class, jobConf); @@ -418,28 +419,57 @@ } /** - * C = alpha*B + A + * C = B + A * - * @param alpha * @param B * @return C * @throws IOException */ - public DenseMatrix add(double alpha, Matrix B) throws IOException { - ensureForAddition(B); - - DenseMatrix temp = new DenseMatrix(config); - temp.set(alpha, B); - DenseMatrix result = this.add(temp); + public DenseMatrix add(Matrix B) throws IOException { + return add(1.0, B); + } + + public DenseMatrix add(Matrix... matrices) throws IOException { + // ensure all the matrices are suitable for addition. + for (Matrix m : matrices) { + ensureForAddition(m); + } + + DenseMatrix result = new DenseMatrix(config); + JobConf jobConf = new JobConf(config); + jobConf.setJobName("addition MR job" + result.getPath()); + + jobConf.setNumMapTasks(config.getNumMapTasks()); + jobConf.setNumReduceTasks(config.getNumReduceTasks()); + + StringBuilder summandList = new StringBuilder(); + StringBuilder alphaList = new StringBuilder(); + for (Matrix m : matrices) { + summandList.append(m.getPath()); + summandList.append(","); + alphaList.append("1"); + alphaList.append(","); + } + summandList.deleteCharAt(summandList.length() - 1); + alphaList.deleteCharAt(alphaList.length() - 1); + + RowCyclicAdditionMap.initJob(this.getPath(), summandList.toString(), + alphaList.toString(), RowCyclicAdditionMap.class, IntWritable.class, + MapWritable.class, jobConf); + RowCyclicAdditionReduce.initJob(result.getPath(), + RowCyclicAdditionReduce.class, jobConf); + + JobManager.execute(jobConf, result); return result; } - + private void ensureForAddition(Matrix m) throws IOException { - if(getRows()!= m.getRows() || getColumns() != m.getColumns()) { - throw new IOException("Matrices' rows and columns should be same while A+B."); + if (getRows() != m.getRows() || getColumns() != m.getColumns()) { + throw new IOException( + "Matrices' rows and columns should be same while A+B."); } } - + /** * C = A*B using SIMD algorithm * @@ -449,7 +479,7 @@ */ public DenseMatrix mult(Matrix B) throws IOException { ensureForMultiplication(B); - + DenseMatrix result = new DenseMatrix(config); JobConf jobConf = new JobConf(config); @@ -458,8 +488,8 @@ jobConf.setNumMapTasks(config.getNumMapTasks()); jobConf.setNumReduceTasks(config.getNumReduceTasks()); - SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), this.getType(), SIMDMultiplyMap.class, - IntWritable.class, MapWritable.class, jobConf); + SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), this.getType(), + SIMDMultiplyMap.class, IntWritable.class, MapWritable.class, jobConf); SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class, jobConf); JobManager.execute(jobConf, result); @@ -476,13 +506,13 @@ */ public DenseMatrix mult(Matrix B, int blocks) throws IOException { ensureForMultiplication(B); - + DenseMatrix collectionTable = new DenseMatrix(config); LOG.info("Collect Blocks"); collectBlocksMapRed(this.getPath(), collectionTable, blocks, true); collectBlocksMapRed(B.getPath(), collectionTable, blocks, false); - + DenseMatrix result = new DenseMatrix(config); JobConf jobConf = new JobConf(config); @@ -500,9 +530,9 @@ // Should be collectionTable removed? return result; } - + private void ensureForMultiplication(Matrix m) throws IOException { - if(getColumns() != m.getRows()) { + if (getColumns() != m.getRows()) { throw new IOException("A's columns should equal with B's rows while A*B."); } } @@ -541,8 +571,8 @@ } /** - * Returns the sub matrix formed by selecting certain rows and - * columns from a bigger matrix. The sub matrix is a in-memory operation only. + * Returns the sub matrix formed by selecting certain rows and columns from a + * bigger matrix. The sub matrix is a in-memory operation only. * * @param i0 the start index of row * @param i1 the end index of row Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java?rev=751922&r1=751921&r2=751922&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java Tue Mar 10 00:36:43 2009 @@ -39,25 +39,38 @@ public class RowCyclicAdditionMap extends MapReduceBase implements Mapper { static final Logger LOG = Logger.getLogger(RowCyclicAdditionMap.class); - protected DenseMatrix matrix_b; - public static final String MATRIX_B = "hama.addition.matrix.b"; + protected DenseMatrix[] matrix_summands; + protected double[] matrix_alphas; + public static final String MATRIX_SUMMANDS = "hama.addition.summands"; + public static final String MATRIX_ALPHAS = "hama.addition.alphas"; public void configure(JobConf job) { try { - matrix_b = new DenseMatrix(new HamaConfiguration(job), job.get(MATRIX_B, "")); + String[] matrix_names = job.get(MATRIX_SUMMANDS, "").split(","); + String[] matrix_alpha_strs = job.get(MATRIX_ALPHAS, "").split(","); + assert(matrix_names.length == matrix_alpha_strs.length && matrix_names.length >= 1); + + matrix_summands = new DenseMatrix[matrix_names.length]; + matrix_alphas = new double[matrix_names.length]; + for(int i=0; i map, Class outputKeyClass, - Class outputValueClass, JobConf jobConf) { + public static void initJob(String matrix_a, String matrix_summandlist, + String matrix_alphalist, Class map, + Class outputKeyClass, Class outputValueClass, + JobConf jobConf) { jobConf.setMapOutputValueClass(outputValueClass); jobConf.setMapOutputKeyClass(outputKeyClass); jobConf.setMapperClass(map); - jobConf.set(MATRIX_B, matrix_b); + jobConf.set(MATRIX_SUMMANDS, matrix_summandlist); + jobConf.set(MATRIX_ALPHAS, matrix_alphalist); jobConf.setInputFormat(VectorInputFormat.class); FileInputFormat.addInputPaths(jobConf, matrix_a); @@ -69,10 +82,13 @@ OutputCollector output, Reporter reporter) throws IOException { - DenseVector a = matrix_b.getRow(key.get()); - DenseVector b = new DenseVector(value); - DenseVector c = a.add(b); - output.collect(key, c.getEntries()); + DenseVector result = new DenseVector(value); + DenseVector summand; + for(int i=0; i