Return-Path: Delivered-To: apmail-incubator-hama-commits-archive@locus.apache.org Received: (qmail 77692 invoked from network); 5 Jan 2009 00:30:27 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Jan 2009 00:30:27 -0000 Received: (qmail 77883 invoked by uid 500); 5 Jan 2009 00:30:27 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 77866 invoked by uid 500); 5 Jan 2009 00:30:27 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@ Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 77855 invoked by uid 99); 5 Jan 2009 00:30:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jan 2009 16:30:27 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Jan 2009 00:30:26 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 69435238899E; Sun, 4 Jan 2009 16:30:06 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r731394 - in /incubator/hama/trunk/src/java/org/apache/hama: Constants.java DenseMatrix.java algebra/BlockCyclicMultiplyMap.java Date: Mon, 05 Jan 2009 00:30:06 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090105003006.69435238899E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: edwardyoon Date: Sun Jan 4 16:30:05 2009 New Revision: 731394 URL: http://svn.apache.org/viewvc?rev=731394&view=rev Log: Bug of scanner range in block multiplication Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=731394&r1=731393&r2=731394&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Sun Jan 4 16:30:05 2009 @@ -91,4 +91,6 @@ public static final String BLOCK = "block:"; public static final String BLOCK_PATH = "attribute:blockPath"; + + public static final String BLOCK_SIZE = "attribute:blockSize"; } Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=731394&r1=731393&r2=731394&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Sun Jan 4 16:30:05 2009 @@ -395,7 +395,7 @@ if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) { BlockCyclicMultiplyMap.initJob(this.getBlockedMatrixPath(), - ((DenseMatrix) B).getBlockedMatrixPath(), + ((DenseMatrix) B).getBlockedMatrixPath(), this.getBlockedMatrixSize(), BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class, jobConf); BlockCyclicMultiplyReduce.initJob(result.getPath(), @@ -506,7 +506,7 @@ int block_size = (int) blocks; Matrix blockedMatrix = new DenseMatrix(config); blockedMatrix.setDimension(block_size, block_size); - this.setBlockedMatrixPath(blockedMatrix.getPath()); + this.setBlockedMatrixPath(blockedMatrix.getPath(), block_size); JobConf jobConf = new JobConf(config); jobConf.setJobName("Blocking MR job" + getPath()); @@ -524,9 +524,15 @@ Constants.BLOCK_PATH).getValue()); } - protected void setBlockedMatrixPath(String path) throws IOException { + protected void setBlockedMatrixPath(String path, int size) throws IOException { BatchUpdate update = new BatchUpdate(Constants.METADATA); update.put(Constants.BLOCK_PATH, Bytes.toBytes(path)); + update.put(Constants.BLOCK_SIZE, Bytes.toBytes(size)); table.commit(update); } + + public int getBlockedMatrixSize() throws IOException { + return Bytes.toInt(table.get(Constants.METADATA, + Constants.BLOCK_SIZE).getValue()); + } } Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=731394&r1=731393&r2=731394&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Sun Jan 4 16:30:05 2009 @@ -32,8 +32,6 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hama.Constants; -import org.apache.hama.DenseMatrix; -import org.apache.hama.HamaConfiguration; import org.apache.hama.SubMatrix; import org.apache.hama.io.BlockID; import org.apache.hama.io.BlockWritable; @@ -43,25 +41,29 @@ public class BlockCyclicMultiplyMap extends MapReduceBase implements Mapper { static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class); - protected DenseMatrix matrix_b; + protected HTable table; + protected int block_size; public static final String MATRIX_B = "hama.multiplication.matrix.b"; + public static final String BLOCK_SIZE = "hama.multiplication.block.size"; public void configure(JobConf job) { try { - matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, "")); + table = new HTable(job.get(MATRIX_B, "")); + block_size = Integer.parseInt(job.get(BLOCK_SIZE, "")); } catch (IOException e) { LOG.warn("Load matrix_b failed : " + e.getMessage()); } } public static void initJob(String matrix_a, String matrix_b, - Class map, Class outputKeyClass, + int block_size, Class map, Class outputKeyClass, Class outputValueClass, JobConf jobConf) { jobConf.setMapOutputValueClass(outputValueClass); jobConf.setMapOutputKeyClass(outputKeyClass); jobConf.setMapperClass(map); jobConf.set(MATRIX_B, matrix_b); + jobConf.set(BLOCK_SIZE, String.valueOf(block_size)); jobConf.setInputFormat(BlockInputFormat.class); FileInputFormat.addInputPaths(jobConf, matrix_a); @@ -73,25 +75,36 @@ public void map(BlockID key, BlockWritable value, OutputCollector output, Reporter reporter) throws IOException { - // we don't need to get blockSize each time - int blockSize = matrix_b.getRows(); SubMatrix a = value.get(); - HTable table = matrix_b.getHTable(); - // startKey : new BlockID(key.getColumn(), 0).toString() - // endKey : new BlockID(key.getColumn(), blockSize+1).toString() - Scanner scan = table.getScanner(new byte[][] { Bytes - .toBytes(Constants.BLOCK) }, - new BlockID(key.getColumn(), 0).getBytes(), new BlockID( - key.getColumn(), blockSize + 1).getBytes()); + BlockID startBlock = new BlockID(key.getColumn(), 0); + BlockID endBlock = new BlockID(key.getColumn() + 1, 0); + Scanner scan; + if ((key.getColumn() + 1) == block_size) { + scan = table.getScanner(new byte[][] { Bytes + .toBytes(Constants.BLOCK) }, startBlock.getBytes()); + } else { + scan = table.getScanner(new byte[][] { Bytes + .toBytes(Constants.BLOCK) }, startBlock.getBytes(), endBlock + .getBytes()); + } + int blocks = 0; for (RowResult row : scan) { BlockID bid = new BlockID(row.getRow()); SubMatrix b = new SubMatrix(row.get(Constants.BLOCK).getValue()); SubMatrix c = a.mult(b); output.collect(new BlockID(key.getRow(), bid.getColumn()), new BlockWritable(c)); + blocks++; } + + if (blocks == 0) + throw new IOException("There is no matrix b." + + "\ntableName: " + new String(table.getTableName()) + + "\nscanner startKey: " + startBlock.toString() + ", endKey: " + + endBlock.toString()); + scan.close(); } }