Return-Path: Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: (qmail 82033 invoked from network); 11 Feb 2009 01:10:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Feb 2009 01:10:51 -0000 Received: (qmail 22987 invoked by uid 500); 11 Feb 2009 01:10:50 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 22971 invoked by uid 500); 11 Feb 2009 01:10:50 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@ Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 22960 invoked by uid 99); 11 Feb 2009 01:10:50 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Feb 2009 17:10:50 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2009 01:10:43 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 42F282388A16; Wed, 11 Feb 2009 01:10:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r743189 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/mapred/ Date: Wed, 11 Feb 2009 01:10:22 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090211011023.42F282388A16@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: edwardyoon Date: Wed Feb 11 01:10:22 2009 New Revision: 743189 URL: http://svn.apache.org/viewvc?rev=743189&view=rev Log: Refactor blockingMapRed Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java Removed: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=743189&r1=743188&r2=743189&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Wed Feb 11 01:10:22 2009 @@ -34,6 +34,7 @@ IMPROVEMENTS + HAMA-150: Refactor blockingMapRed (edwardyoon) HAMA-148: Implement of set(double alpha, Matrix B) (edwardyoon) HAMA-100: Implement of set(Matrix B) (edwardyoon) HAMA-144: GetProgress during MR over a matrix (samuel) Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=743189&r1=743188&r2=743189&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Wed Feb 11 01:10:22 2009 @@ -49,9 +49,10 @@ import org.apache.hama.io.MapWritable; import org.apache.hama.io.VectorUpdate; import org.apache.hama.io.VectorWritable; -import org.apache.hama.mapred.BlockingMapRed; +import org.apache.hama.mapred.CollectBlocksMapper; import org.apache.hama.mapred.RandomMatrixMap; import org.apache.hama.mapred.RandomMatrixReduce; +import org.apache.hama.mapred.VectorInputFormat; import org.apache.hama.util.BytesUtil; import org.apache.hama.util.JobManager; import org.apache.hama.util.RandomVariable; @@ -250,7 +251,7 @@ LOG.info("Create the " + m + " * " + n + " random matrix : " + rand.getPath()); rand.setDimension(m, n); - + JobConf jobConf = new JobConf(conf); jobConf.setJobName("random matrix MR job : " + rand.getPath()); @@ -262,9 +263,9 @@ jobConf.setMapperClass(RandomMatrixMap.class); jobConf.setMapOutputKeyClass(IntWritable.class); jobConf.setMapOutputValueClass(VectorWritable.class); - + RandomMatrixReduce.initJob(rand.getPath(), RandomMatrixReduce.class, - jobConf); + jobConf); jobConf.setSpeculativeExecution(false); jobConf.set("matrix.column", String.valueOf(n)); @@ -381,17 +382,16 @@ } public Matrix mult(Matrix B) throws IOException { - Matrix result = new DenseMatrix(config); - + Matrix result = new DenseMatrix(config); + JobConf jobConf = new JobConf(config); jobConf.setJobName("multiplication MR job : " + result.getPath()); jobConf.setNumMapTasks(config.getNumMapTasks()); jobConf.setNumReduceTasks(config.getNumReduceTasks()); - - SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), - SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class, - jobConf); + + SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), SIMDMultiplyMap.class, + IntWritable.class, VectorWritable.class, jobConf); SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class, jobConf); JobManager.execute(jobConf, result); @@ -409,28 +409,32 @@ public Matrix mult(Matrix B, int blocks) throws IOException { Matrix collectionTable = new DenseMatrix(config); LOG.info("Collect Blocks"); - collectBlocks(this, collectionTable, blocks, true); - collectBlocks(B, collectionTable, blocks, false); + collectBlocks(this, B, blocks, collectionTable); Matrix result = new DenseMatrix(config); - + JobConf jobConf = new JobConf(config); jobConf.setJobName("multiplication MR job : " + result.getPath()); jobConf.setNumMapTasks(config.getNumMapTasks()); jobConf.setNumReduceTasks(config.getNumReduceTasks()); - - BlockMultiplyMap.initJob(collectionTable.getPath(), - BlockMultiplyMap.class, BlockID.class, BlockWritable.class, + + BlockMultiplyMap.initJob(collectionTable.getPath(), BlockMultiplyMap.class, + BlockID.class, BlockWritable.class, jobConf); + BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class, jobConf); - BlockMultiplyReduce.initJob(result.getPath(), - BlockMultiplyReduce.class, jobConf); JobManager.execute(jobConf, result); // Should be collectionTable removed? return result; } + private void collectBlocks(Matrix a, Matrix b, int blocks, + Matrix collectionTable) throws IOException { + collectBlocksMapRed(a.getPath(), collectionTable, blocks, true); + collectBlocksMapRed(b.getPath(), collectionTable, blocks, false); + } + public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException { // TODO Auto-generated method stub return null; @@ -471,8 +475,8 @@ cols[jj] = BytesUtil.getColumnIndex(j); } - Scanner scan = table.getScanner(cols, BytesUtil.getRowIndex(i0), - BytesUtil.getRowIndex(i1 + 1)); + Scanner scan = table.getScanner(cols, BytesUtil.getRowIndex(i0), BytesUtil + .getRowIndex(i1 + 1)); Iterator it = scan.iterator(); int i = 0; RowResult rs = null; @@ -491,13 +495,13 @@ /** * Collect Blocks * - * @param resource + * @param path * @param collectionTable * @param blockNum * @param bool * @throws IOException */ - public void collectBlocks(Matrix resource, Matrix collectionTable, + public void collectBlocksMapRed(String path, Matrix collectionTable, int blockNum, boolean bool) throws IOException { double blocks = Math.pow(blockNum, 0.5); if (!String.valueOf(blocks).endsWith(".0")) @@ -505,15 +509,21 @@ int block_size = (int) blocks; collectionTable.setDimension(block_size, block_size); - + JobConf jobConf = new JobConf(config); jobConf.setJobName("Blocking MR job" + getPath()); jobConf.setNumMapTasks(config.getNumMapTasks()); jobConf.setNumReduceTasks(config.getNumReduceTasks()); + jobConf.setMapperClass(CollectBlocksMapper.class); + jobConf.setInputFormat(VectorInputFormat.class); + jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN); + + FileInputFormat.addInputPaths(jobConf, path); + + CollectBlocksMapper.initJob(collectionTable.getPath(), bool, block_size, + this.getRows(), this.getColumns(), jobConf); - BlockingMapRed.initJob(resource.getPath(), collectionTable.getPath(), - bool, block_size, this.getRows(), this.getColumns(), jobConf); JobManager.execute(jobConf); } } Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java?rev=743189&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java Wed Feb 11 01:10:22 2009 @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hama.mapred; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.BlockWritable; +import org.apache.hama.io.VectorWritable; + +/** + * Abstract Blocking Map/Reduce Class to configure the job. + */ +public abstract class CollectBlocksBase extends MapReduceBase { + /** Parameter of the path of the matrix to be blocked * */ + public static final String BLOCK_SIZE = "hama.blocking.size"; + public static final String ROWS = "hama.blocking.rows"; + public static final String COLUMNS = "hama.blocking.columns"; + public static final String MATRIX_POS = "a.ore.b"; + + protected int mBlockNum; + protected int mBlockRowSize; + protected int mBlockColSize; + protected int mRows; + protected int mColumns; + protected boolean matrixPos; + + @Override + public void configure(JobConf job) { + mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, "")); + mRows = Integer.parseInt(job.get(ROWS, "")); + mColumns = Integer.parseInt(job.get(COLUMNS, "")); + + mBlockRowSize = mRows / mBlockNum; + mBlockColSize = mColumns / mBlockNum; + + matrixPos = job.getBoolean(MATRIX_POS, true); + } + + /** + * Initialize a job to blocking a table + */ + @SuppressWarnings("unchecked") + public static void initJob(String collectionTable, boolean bool, + int block_size, int i, int j, JobConf job) { + job.setReducerClass(CollectBlocksReducer.class); + job.setMapOutputKeyClass(BlockID.class); + job.setMapOutputValueClass(VectorWritable.class); + + job.setOutputFormat(BlockOutputFormat.class); + job.setOutputKeyClass(BlockID.class); + job.setOutputValueClass(BlockWritable.class); + job.set(BLOCK_SIZE, String.valueOf(block_size)); + job.set(ROWS, String.valueOf(i)); + job.set(COLUMNS, String.valueOf(j)); + job.setBoolean(MATRIX_POS, bool); + job.set(BlockOutputFormat.OUTPUT_TABLE, collectionTable); + + if (bool) + job.set(BlockOutputFormat.COLUMN, "a"); + else + job.set(BlockOutputFormat.COLUMN, "b"); + } +} Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java?rev=743189&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java Wed Feb 11 01:10:22 2009 @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hama.mapred; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.VectorWritable; + +@SuppressWarnings("unchecked") +public interface CollectBlocksMap + extends Mapper { +} Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java?rev=743189&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java Wed Feb 11 01:10:22 2009 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hama.mapred; + +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.DenseVector; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.VectorWritable; + +/** + * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix + */ +public class CollectBlocksMapper extends CollectBlocksBase implements + CollectBlocksMap { + + @Override + public void map(IntWritable key, VectorWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + int startColumn; + int endColumn; + int blkRow = key.get() / mBlockRowSize; + DenseVector dv = value.getDenseVector(); + + int i = 0; + do { + startColumn = i * mBlockColSize; + endColumn = startColumn + mBlockColSize - 1; + if (endColumn >= mColumns) // the last sub vector + endColumn = mColumns - 1; + output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(), dv + .subVector(startColumn, endColumn))); + + i++; + } while (endColumn < (mColumns - 1)); + } + +} Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java?rev=743189&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java Wed Feb 11 01:10:22 2009 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hama.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.SubMatrix; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.BlockWritable; +import org.apache.hama.io.VectorWritable; + +/** + * Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k) + * to avoid duplicated records. Each row has a two sub matrices of a(i, k) and + * b(k, j). + */ +public class CollectBlocksReducer extends CollectBlocksBase implements + Reducer { + + @Override + public void reduce(BlockID key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + // Note: all the sub-vectors are grouped by {@link + // org.apache.hama.io.BlockID} + + // the block's base offset in the original matrix + int colBase = key.getColumn() * mBlockColSize; + int rowBase = key.getRow() * mBlockRowSize; + + // the block's size : rows & columns + int smRows = mBlockRowSize; + if ((rowBase + mBlockRowSize - 1) >= mRows) + smRows = mRows - rowBase; + int smCols = mBlockColSize; + if ((colBase + mBlockColSize - 1) >= mColumns) + smCols = mColumns - colBase; + + // construct the matrix + SubMatrix subMatrix = new SubMatrix(smRows, smCols); + + // i, j is the current offset in the sub-matrix + int i = 0, j = 0; + while (values.hasNext()) { + VectorWritable vw = values.next(); + // check the size is suitable + if (vw.size() != smCols) + throw new IOException("Block Column Size dismatched."); + i = vw.row - rowBase; + if (i >= smRows || i < 0) + throw new IOException("Block Row Size dismatched."); + + // put the subVector to the subMatrix + for (j = 0; j < smCols; j++) { + subMatrix.set(i, j, vw.get(colBase + j)); + } + } + BlockWritable outValue = new BlockWritable(subMatrix); + + // It'll used for only matrix multiplication. + if (matrixPos) { + for (int x = 0; x < mBlockNum; x++) { + int r = (key.getRow() * mBlockNum) * mBlockNum; + int seq = (x * mBlockNum) + key.getColumn() + r; + output.collect(new BlockID(key.getRow(), x, seq), outValue); + } + } else { + for (int x = 0; x < mBlockNum; x++) { + int seq = (x * mBlockNum * mBlockNum) + (key.getColumn() * mBlockNum) + + key.getRow(); + output.collect(new BlockID(x, key.getColumn(), seq), outValue); + } + } + } +}