Return-Path: Delivered-To: apmail-incubator-hama-commits-archive@locus.apache.org Received: (qmail 60759 invoked from network); 30 Dec 2008 11:53:18 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 30 Dec 2008 11:53:18 -0000 Received: (qmail 71937 invoked by uid 500); 30 Dec 2008 11:53:18 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 71918 invoked by uid 500); 30 Dec 2008 11:53:18 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@ Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 71905 invoked by uid 99); 30 Dec 2008 11:53:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Dec 2008 03:53:18 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Dec 2008 11:53:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DC450238889B; Tue, 30 Dec 2008 03:52:48 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r730103 - in /incubator/hama/trunk/src: java/org/apache/hama/ java/org/apache/hama/algebra/ java/org/apache/hama/io/ java/org/apache/hama/mapred/ test/org/apache/hama/ test/org/apache/hama/mapred/ Date: Tue, 30 Dec 2008 11:52:48 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081230115248.DC450238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: edwardyoon Date: Tue Dec 30 03:52:47 2008 New Revision: 730103 URL: http://svn.apache.org/viewvc?rev=730103&view=rev Log: Fix OOME Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java incubator/hama/trunk/src/java/org/apache/hama/Constants.java incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=730103&r1=730102&r2=730103&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Tue Dec 30 03:52:47 2008 @@ -1,228 +1,233 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama; - -import java.io.IOException; - -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.io.BatchUpdate; -import org.apache.hadoop.hbase.io.Cell; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hama.io.VectorUpdate; -import org.apache.hama.util.BytesUtil; -import org.apache.log4j.Logger; - -/** - * Methods of the matrix classes - */ -public abstract class AbstractMatrix implements Matrix { - static final Logger LOG = Logger.getLogger(AbstractMatrix.class); - - protected HamaConfiguration config; - protected HBaseAdmin admin; - // a matrix just need a table path to point to the table which stores matrix. - // let HamaAdmin manage Matrix Name space. - protected String matrixPath; - protected HTable table; - protected HTableDescriptor tableDesc; - protected HamaAdmin hamaAdmin; - - protected boolean closed = true; - - /** - * Sets the job configuration - * - * @param conf configuration object - * @throws MasterNotRunningException - */ - public void setConfiguration(HamaConfiguration conf) - throws MasterNotRunningException { - this.config = conf; - this.admin = new HBaseAdmin(config); - - hamaAdmin = new HamaAdminImpl(conf, admin); - } - - /** - * Create matrix space - */ - protected void create() throws IOException { - // It should run only when table doesn't exist. - if (!admin.tableExists(matrixPath)) { - this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN)); - this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE)); - this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY)); - this.tableDesc.addFamily(new HColumnDescriptor(Constants.BLOCK)); - - LOG.info("Initializing the matrix storage."); - this.admin.createTable(this.tableDesc); - LOG.info("Create Matrix " + matrixPath); - - // connect to the table. - table = new HTable(config, matrixPath); - // Record the matrix type in METADATA_TYPE - BatchUpdate update = new BatchUpdate(Constants.METADATA); - update.put(Constants.METADATA_TYPE, Bytes.toBytes(this.getClass() - .getSimpleName())); - - table.commit(update); - - // the new matrix's reference is 1. - setReference(1); - } - } - - public HTable getHTable() { - return this.table; - } - - /** {@inheritDoc} */ - public int getRows() throws IOException { - Cell rows = null; - rows = table.get(Constants.METADATA, Constants.METADATA_ROWS); - return (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0; - } - - /** {@inheritDoc} */ - public int getColumns() throws IOException { - Cell columns = table.get(Constants.METADATA, Constants.METADATA_COLUMNS); - return BytesUtil.bytesToInt(columns.getValue()); - } - - /** {@inheritDoc} */ - public void set(int i, int j, double value) throws IOException { - VectorUpdate update = new VectorUpdate(i); - update.put(j, value); - table.commit(update.getBatchUpdate()); - } - - /** {@inheritDoc} */ - public void add(int i, int j, double value) throws IOException { - VectorUpdate update = new VectorUpdate(i); - update.put(j, value + this.get(i, j)); - table.commit(update.getBatchUpdate()); - } - - /** {@inheritDoc} */ - public void setDimension(int rows, int columns) throws IOException { - VectorUpdate update = new VectorUpdate(Constants.METADATA); - update.put(Constants.METADATA_ROWS, rows); - update.put(Constants.METADATA_COLUMNS, columns); - - table.commit(update.getBatchUpdate()); - } - - public String getRowLabel(int row) throws IOException { - Cell rows = null; - rows = table.get(BytesUtil.getRowIndex(row), Bytes - .toBytes(Constants.ATTRIBUTE + "string")); - - return (rows != null) ? Bytes.toString(rows.getValue()) : null; - } - - public void setRowLabel(int row, String name) throws IOException { - VectorUpdate update = new VectorUpdate(row); - update.put(Constants.ATTRIBUTE + "string", name); - table.commit(update.getBatchUpdate()); - } - - public String getColumnLabel(int column) throws IOException { - Cell rows = null; - rows = table.get(Constants.CINDEX, (Constants.ATTRIBUTE + column)); - return (rows != null) ? Bytes.toString(rows.getValue()) : null; - } - - public void setColumnLabel(int column, String name) throws IOException { - VectorUpdate update = new VectorUpdate(Constants.CINDEX); - update.put(column, name); - table.commit(update.getBatchUpdate()); - } - - /** {@inheritDoc} */ - public String getPath() { - return matrixPath; - } - - protected void setReference(int reference) throws IOException { - BatchUpdate update = new BatchUpdate(Constants.METADATA); - update.put(Constants.METADATA_REFERENCE, Bytes.toBytes(reference)); - table.commit(update); - } - - protected int incrementAndGetRef() throws IOException { - int reference = 1; - Cell rows = null; - rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE); - if (rows != null) { - reference = Bytes.toInt(rows.getValue()); - reference++; - } - setReference(reference); - return reference; - } - - protected int decrementAndGetRef() throws IOException { - int reference = 0; - Cell rows = null; - rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE); - if (rows != null) { - reference = Bytes.toInt(rows.getValue()); - if (reference > 0) // reference==0, we need not to decrement it. - reference--; - } - setReference(reference); - return reference; - } - - protected boolean hasAliaseName() throws IOException { - Cell rows = null; - rows = table.get(Constants.METADATA, Constants.ALIASENAME); - return (rows != null) ? true : false; - } - - public void close() throws IOException { - if (closed) // have been closed - return; - int reference = decrementAndGetRef(); - if (reference <= 0) { // no reference again. - if (!hasAliaseName()) { // the table has not been aliased, we delete the - // table. - if (admin.isTableEnabled(matrixPath)) { - admin.disableTable(matrixPath); - admin.deleteTable(matrixPath); - } - } - } - closed = true; - } - - public boolean save(String aliasename) throws IOException { - // mark & update the aliase name in "alise:name" meta column. - // ! one matrix has only one aliasename now. - BatchUpdate update = new BatchUpdate(Constants.METADATA); - update.put(Constants.ALIASENAME, Bytes.toBytes(aliasename)); - table.commit(update); - return hamaAdmin.save(this, aliasename); - } -} +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hama.io.VectorUpdate; +import org.apache.hama.util.BytesUtil; +import org.apache.log4j.Logger; +import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType; +import org.apache.hadoop.hbase.HConstants; + +/** + * Methods of the matrix classes + */ +public abstract class AbstractMatrix implements Matrix { + static final Logger LOG = Logger.getLogger(AbstractMatrix.class); + + protected HamaConfiguration config; + protected HBaseAdmin admin; + // a matrix just need a table path to point to the table which stores matrix. + // let HamaAdmin manage Matrix Name space. + protected String matrixPath; + protected HTable table; + protected HTableDescriptor tableDesc; + protected HamaAdmin hamaAdmin; + + protected boolean closed = true; + + /** + * Sets the job configuration + * + * @param conf configuration object + * @throws MasterNotRunningException + */ + public void setConfiguration(HamaConfiguration conf) + throws MasterNotRunningException { + this.config = conf; + this.admin = new HBaseAdmin(config); + + hamaAdmin = new HamaAdminImpl(conf, admin); + } + + /** + * Create matrix space + */ + protected void create() throws IOException { + // It should run only when table doesn't exist. + if (!admin.tableExists(matrixPath)) { + this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN)); + this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE)); + this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY)); + this.tableDesc.addFamily(new HColumnDescriptor( + Bytes.toBytes(Constants.BLOCK), 1, CompressionType.NONE, + false, false, Integer.MAX_VALUE, HConstants.FOREVER, false + )); + + LOG.info("Initializing the matrix storage."); + this.admin.createTable(this.tableDesc); + LOG.info("Create Matrix " + matrixPath); + + // connect to the table. + table = new HTable(config, matrixPath); + // Record the matrix type in METADATA_TYPE + BatchUpdate update = new BatchUpdate(Constants.METADATA); + update.put(Constants.METADATA_TYPE, Bytes.toBytes(this.getClass() + .getSimpleName())); + + table.commit(update); + + // the new matrix's reference is 1. + setReference(1); + } + } + + public HTable getHTable() { + return this.table; + } + + /** {@inheritDoc} */ + public int getRows() throws IOException { + Cell rows = null; + rows = table.get(Constants.METADATA, Constants.METADATA_ROWS); + return (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0; + } + + /** {@inheritDoc} */ + public int getColumns() throws IOException { + Cell columns = table.get(Constants.METADATA, Constants.METADATA_COLUMNS); + return BytesUtil.bytesToInt(columns.getValue()); + } + + /** {@inheritDoc} */ + public void set(int i, int j, double value) throws IOException { + VectorUpdate update = new VectorUpdate(i); + update.put(j, value); + table.commit(update.getBatchUpdate()); + } + + /** {@inheritDoc} */ + public void add(int i, int j, double value) throws IOException { + VectorUpdate update = new VectorUpdate(i); + update.put(j, value + this.get(i, j)); + table.commit(update.getBatchUpdate()); + } + + /** {@inheritDoc} */ + public void setDimension(int rows, int columns) throws IOException { + VectorUpdate update = new VectorUpdate(Constants.METADATA); + update.put(Constants.METADATA_ROWS, rows); + update.put(Constants.METADATA_COLUMNS, columns); + + table.commit(update.getBatchUpdate()); + } + + public String getRowLabel(int row) throws IOException { + Cell rows = null; + rows = table.get(BytesUtil.getRowIndex(row), Bytes + .toBytes(Constants.ATTRIBUTE + "string")); + + return (rows != null) ? Bytes.toString(rows.getValue()) : null; + } + + public void setRowLabel(int row, String name) throws IOException { + VectorUpdate update = new VectorUpdate(row); + update.put(Constants.ATTRIBUTE + "string", name); + table.commit(update.getBatchUpdate()); + } + + public String getColumnLabel(int column) throws IOException { + Cell rows = null; + rows = table.get(Constants.CINDEX, (Constants.ATTRIBUTE + column)); + return (rows != null) ? Bytes.toString(rows.getValue()) : null; + } + + public void setColumnLabel(int column, String name) throws IOException { + VectorUpdate update = new VectorUpdate(Constants.CINDEX); + update.put(column, name); + table.commit(update.getBatchUpdate()); + } + + /** {@inheritDoc} */ + public String getPath() { + return matrixPath; + } + + protected void setReference(int reference) throws IOException { + BatchUpdate update = new BatchUpdate(Constants.METADATA); + update.put(Constants.METADATA_REFERENCE, Bytes.toBytes(reference)); + table.commit(update); + } + + protected int incrementAndGetRef() throws IOException { + int reference = 1; + Cell rows = null; + rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE); + if (rows != null) { + reference = Bytes.toInt(rows.getValue()); + reference++; + } + setReference(reference); + return reference; + } + + protected int decrementAndGetRef() throws IOException { + int reference = 0; + Cell rows = null; + rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE); + if (rows != null) { + reference = Bytes.toInt(rows.getValue()); + if (reference > 0) // reference==0, we need not to decrement it. + reference--; + } + setReference(reference); + return reference; + } + + protected boolean hasAliaseName() throws IOException { + Cell rows = null; + rows = table.get(Constants.METADATA, Constants.ALIASENAME); + return (rows != null) ? true : false; + } + + public void close() throws IOException { + if (closed) // have been closed + return; + int reference = decrementAndGetRef(); + if (reference <= 0) { // no reference again. + if (!hasAliaseName()) { // the table has not been aliased, we delete the + // table. + if (admin.isTableEnabled(matrixPath)) { + admin.disableTable(matrixPath); + admin.deleteTable(matrixPath); + } + } + } + closed = true; + } + + public boolean save(String aliasename) throws IOException { + // mark & update the aliase name in "alise:name" meta column. + // ! one matrix has only one aliasename now. + BatchUpdate update = new BatchUpdate(Constants.METADATA); + update.put(Constants.ALIASENAME, Bytes.toBytes(aliasename)); + table.commit(update); + return hamaAdmin.save(this, aliasename); + } +} Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=730103&r1=730102&r2=730103&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Tue Dec 30 03:52:47 2008 @@ -87,15 +87,8 @@ /** default try times to generate a suitable tablename */ public static final int DEFAULT_TRY_TIMES = 10000000; - /** - * block position column to store - * {@link org.apache.hama.io.BlockPosition} object - */ - public static final String BLOCK_POSITION = "attribute:blockPosition"; - /** block data column */ public static final String BLOCK = "block:"; - /** block size */ - public static final String BLOCK_SIZE = "attribute:blockSize"; + public static final String BLOCK_PATH = "attribute:blockPath"; } Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=730103&r1=730102&r2=730103&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Tue Dec 30 03:52:47 2008 @@ -49,7 +49,6 @@ import org.apache.hama.algebra.SIMDMultiplyMap; import org.apache.hama.algebra.SIMDMultiplyReduce; import org.apache.hama.io.BlockID; -import org.apache.hama.io.BlockPosition; import org.apache.hama.io.BlockWritable; import org.apache.hama.io.DoubleEntry; import org.apache.hama.io.MapWritable; @@ -395,7 +394,8 @@ jobConf.setNumReduceTasks(config.getNumReduceTasks()); if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) { - BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(), + BlockCyclicMultiplyMap.initJob(this.getBlockedMatrixPath(), + ((DenseMatrix) B).getBlockedMatrixPath(), BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class, jobConf); BlockCyclicMultiplyReduce.initJob(result.getPath(), @@ -477,7 +477,7 @@ } public boolean isBlocked() throws IOException { - return (table.get(Constants.METADATA, Constants.BLOCK_SIZE) == null) ? false + return (table.get(Constants.METADATA, Constants.BLOCK_PATH) == null) ? false : true; } @@ -486,64 +486,12 @@ Bytes.toBytes(Constants.BLOCK)).getValue()); } - /** - * @return the size of block - * @throws IOException - */ - public int getBlockSize() throws IOException { - return (isBlocked()) ? BytesUtil.bytesToInt(table.get(Constants.METADATA, - Constants.BLOCK_SIZE).getValue()) : -1; - } - - protected void setBlockSize(int blockNum) throws IOException { - BatchUpdate update = new BatchUpdate(Constants.METADATA); - update.put(Constants.BLOCK_SIZE, BytesUtil.intToBytes(blockNum)); - table.commit(update); - } - public void setBlock(int i, int j, SubMatrix matrix) throws IOException { BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes()); update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes()); table.commit(update); } - - protected void setBlockPosition(int blockNum) throws IOException { - int block_row_size = this.getRows() / blockNum; - int block_column_size = this.getColumns() / blockNum; - - int startRow, endRow, startColumn, endColumn; - int i = 0, j = 0; - do { - startRow = i * block_row_size; - endRow = (startRow + block_row_size) - 1; - if (endRow >= this.getRows()) - endRow = this.getRows() - 1; - - j = 0; - do { - startColumn = j * block_column_size; - endColumn = (startColumn + block_column_size) - 1; - if (endColumn >= this.getColumns()) - endColumn = this.getColumns() - 1; - - BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes()); - update.put(Constants.BLOCK_POSITION, new BlockPosition(startRow, - endRow, startColumn, endColumn).getBytes()); - table.commit(update); - - j++; - } while (endColumn < (this.getColumns() - 1)); - - i++; - } while (endRow < (this.getRows() - 1)); - } - - protected BlockPosition getBlockPosition(int i, int j) throws IOException { - byte[] rs = table.get(new BlockID(i, j).getBytes(), - Bytes.toBytes(Constants.BLOCK_POSITION)).getValue(); - return new BlockPosition(rs); - } - + /** * Using a map/reduce job to block a dense matrix. * @@ -551,51 +499,34 @@ * @throws IOException */ public void blocking_mapred(int blockNum) throws IOException { - this.checkBlockNum(blockNum); + double blocks = Math.pow(blockNum, 0.5); + if (!String.valueOf(blocks).endsWith(".0")) + throw new IOException("can't divide."); + int block_size = (int) blocks; + Matrix blockedMatrix = new DenseMatrix(config); + blockedMatrix.setDimension(block_size, block_size); + this.setBlockedMatrixPath(blockedMatrix.getPath()); + JobConf jobConf = new JobConf(config); jobConf.setJobName("Blocking MR job" + getPath()); jobConf.setNumMapTasks(config.getNumMapTasks()); jobConf.setNumReduceTasks(config.getNumReduceTasks()); - BlockingMapRed.initJob(getPath(), jobConf); - + BlockingMapRed.initJob(this.getPath(), blockedMatrix.getPath(), + block_size, this.getRows(), this.getColumns(), jobConf); JobManager.execute(jobConf); } - /** - * Using a scanner to block a dense matrix. If the matrix is large, use the - * blocking_mapred() - * - * @param blockNum - * @throws IOException - */ - public void blocking(int blockNum) throws IOException { - this.checkBlockNum(blockNum); - - String[] columns = new String[] { Constants.BLOCK_POSITION }; - Scanner scan = table.getScanner(columns); - - for (RowResult row : scan) { - BlockID bID = new BlockID(row.getRow()); - BlockPosition pos = new BlockPosition(row.get(Constants.BLOCK_POSITION) - .getValue()); - - setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos.getStartRow(), pos - .getEndRow(), pos.getStartColumn(), pos.getEndColumn())); - } + public String getBlockedMatrixPath() throws IOException { + return Bytes.toString(table.get(Constants.METADATA, + Constants.BLOCK_PATH).getValue()); } - private void checkBlockNum(int blockNum) throws IOException { - double blocks = Math.pow(blockNum, 0.5); - // TODO: Check also it is validation with matrix. - if (!String.valueOf(blocks).endsWith(".0")) - throw new IOException("can't divide."); - - int block_size = (int) blocks; - setBlockPosition(block_size); - setBlockSize(block_size); - LOG.info("Create " + block_size + " * " + block_size + " blocked matrix"); + protected void setBlockedMatrixPath(String path) throws IOException { + BatchUpdate update = new BatchUpdate(Constants.METADATA); + update.put(Constants.BLOCK_PATH, Bytes.toBytes(path)); + table.commit(update); } } Modified: incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java?rev=730103&r1=730102&r2=730103&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java Tue Dec 30 03:52:47 2008 @@ -1,199 +1,213 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.apache.hama.util.BytesUtil; -import org.apache.log4j.Logger; - -/** - * A sub matrix is a matrix formed by selecting certain rows and columns from a - * bigger matrix. This is a in-memory operation only. - */ -public class SubMatrix implements java.io.Serializable { - private static final long serialVersionUID = 3897536498367921547L; - static final Logger LOG = Logger.getLogger(SubMatrix.class); - private double[][] matrix; - - /** - * Constructor - * - * @param i the size of rows - * @param j the size of columns - */ - public SubMatrix(int i, int j) { - this.matrix = new double[i][j]; - } - - /** - * Constructor - * - * @param c a two dimensional double array - */ - public SubMatrix(double[][] c) { - double[][] matrix = c; - this.matrix = matrix; - } - - public SubMatrix(byte[] matrix) throws IOException { - ByteArrayInputStream bos = new ByteArrayInputStream(matrix); - ObjectInputStream oos = new ObjectInputStream(bos); - Object obj = null; - try { - obj = oos.readObject(); - this.matrix = ((SubMatrix)obj).getDoubleArray(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } - oos.close(); - bos.close(); - } - - /** - * Sets the value - * - * @param row - * @param column - * @param value - */ - public void set(int row, int column, double value) { - matrix[row][column] = value; - } - - /** - * Sets the value - * - * @param row - * @param column - * @param value - */ - public void set(int row, int column, byte[] value) { - matrix[row][column] = BytesUtil.bytesToDouble(value); - } - - /** - * Gets the value - * - * @param i - * @param j - * @return the value of submatrix(i, j) - */ - public double get(int i, int j) { - return matrix[i][j]; - } - - public void add(int row, int column, double value) { - matrix[row][column] = matrix[row][column] + value; - } - - /** - * c = a+b - * - * @param b - * @return c - */ - public SubMatrix add(SubMatrix b) { - SubMatrix c = new SubMatrix(this.getRows(), this.getColumns()); - - for (int i = 0; i < this.getRows(); i++) { - for (int j = 0; j < this.getColumns(); j++) { - c.set(i, j, (this.get(i, j) + b.get(i, j))); - } - } - - return c; - } - - /** - * c = a*b - * - * @param b - * @return c - */ - public SubMatrix mult(SubMatrix b) { - SubMatrix c = new SubMatrix(this.getRows(), b.getColumns()); - - for (int i = 0; i < this.getRows(); i++) { - for (int j = 0; j < b.getColumns(); j++) { - for (int k = 0; k < this.getColumns(); k++) { - c.add(i, j, this.get(i, k) * b.get(k, j)); - } - } - } - - return c; - } - - /** - * Gets the number of rows - * - * @return the number of rows - */ - public int getRows() { - return this.matrix.length; - } - - /** - * Gets the number of columns - * - * @return the number of columns - */ - public int getColumns() { - return this.matrix[0].length; - } - - /** - * Close - */ - public void close() { - matrix = null; - } - - /** - * @return the 2d double array - */ - public double[][] getDoubleArray() { - double[][] result = matrix; - return result; - } - - /** - * Gets the bytes of the sub matrix - * - * @return the bytes of the sub matrix - * @throws IOException - */ - public byte[] getBytes() throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(this); - oos.flush(); - oos.close(); - bos.close(); - byte[] data = bos.toByteArray(); - return data; - } - -} +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hama.util.BytesUtil; +import org.apache.log4j.Logger; + +/** + * A sub matrix is a matrix formed by selecting certain rows and columns from a + * bigger matrix. This is a in-memory operation only. + */ +public class SubMatrix implements java.io.Serializable { + private static final long serialVersionUID = 3897536498367921547L; + static final Logger LOG = Logger.getLogger(SubMatrix.class); + private double[][] matrix; + + /** + * Constructor + * + * @param i the size of rows + * @param j the size of columns + */ + public SubMatrix(int i, int j) { + this.matrix = new double[i][j]; + } + + /** + * Constructor + * + * @param c a two dimensional double array + */ + public SubMatrix(double[][] c) { + double[][] matrix = c; + this.matrix = matrix; + } + + public SubMatrix(byte[] matrix) throws IOException { + ByteArrayInputStream bos = new ByteArrayInputStream(matrix); + DataInputStream dis = new DataInputStream(bos); + + int rows = dis.readInt(); + int columns = dis.readInt(); + this.matrix = new double[rows][columns]; + + for(int i = 0; i < rows; i++) { + for(int j = 0; j < columns; j++) { + this.matrix[i][j] = dis.readDouble(); + } + } + + dis.close(); + bos.close(); + } + + /** + * Sets the value + * + * @param row + * @param column + * @param value + */ + public void set(int row, int column, double value) { + matrix[row][column] = value; + } + + /** + * Sets the value + * + * @param row + * @param column + * @param value + */ + public void set(int row, int column, byte[] value) { + matrix[row][column] = BytesUtil.bytesToDouble(value); + } + + /** + * Gets the value + * + * @param i + * @param j + * @return the value of submatrix(i, j) + */ + public double get(int i, int j) { + return matrix[i][j]; + } + + public void add(int row, int column, double value) { + matrix[row][column] = matrix[row][column] + value; + } + + /** + * c = a+b + * + * @param b + * @return c + */ + public SubMatrix add(SubMatrix b) { + SubMatrix c = new SubMatrix(this.getRows(), this.getColumns()); + + for (int i = 0; i < this.getRows(); i++) { + for (int j = 0; j < this.getColumns(); j++) { + c.set(i, j, (this.get(i, j) + b.get(i, j))); + } + } + + return c; + } + + /** + * c = a*b + * + * @param b + * @return c + */ + public SubMatrix mult(SubMatrix b) { + SubMatrix c = new SubMatrix(this.getRows(), b.getColumns()); + + for (int i = 0; i < this.getRows(); i++) { + for (int j = 0; j < b.getColumns(); j++) { + for (int k = 0; k < this.getColumns(); k++) { + c.add(i, j, this.get(i, k) * b.get(k, j)); + } + } + } + + return c; + } + + /** + * Gets the number of rows + * + * @return the number of rows + */ + public int getRows() { + return this.matrix.length; + } + + /** + * Gets the number of columns + * + * @return the number of columns + */ + public int getColumns() { + return this.matrix[0].length; + } + + /** + * Close + */ + public void close() { + matrix = null; + } + + /** + * @return the 2d double array + */ + public double[][] getDoubleArray() { + double[][] result = matrix; + return result; + } + + /** + * Gets the bytes of the sub matrix + * + * @return the bytes of the sub matrix + * @throws IOException + */ + public byte[] getBytes() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + dos.writeInt(this.getRows()); + dos.writeInt(this.getColumns()); + + for(int i = 0; i < this.getRows(); i++) { + for(int j = 0; j < this.getColumns(); j++) { + dos.writeDouble(this.get(i, j)); + } + } + + byte[] data = bos.toByteArray(); + dos.close(); + bos.close(); + return data; + } + +} + Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=730103&r1=730102&r2=730103&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Tue Dec 30 03:52:47 2008 @@ -21,6 +21,10 @@ import java.io.IOException; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; @@ -69,13 +73,24 @@ public void map(BlockID key, BlockWritable value, OutputCollector output, Reporter reporter) throws IOException { - int blockSize = matrix_b.getBlockSize(); + int blockSize = matrix_b.getRows(); SubMatrix a = value.get(); - - for (int j = 0; j < blockSize; j++) { - SubMatrix b = matrix_b.getBlock(key.getColumn(), j); + HTable table = matrix_b.getHTable(); + + // startKey : new BlockID(key.getColumn(), 0).toString() + // endKey : new BlockID(key.getColumn(), blockSize+1).toString() + Scanner scan = table.getScanner(new byte[][] { Bytes + .toBytes(Constants.BLOCK) }, + new BlockID(key.getColumn(), 0).getBytes(), new BlockID( + key.getColumn(), blockSize + 1).getBytes()); + + for (RowResult row : scan) { + BlockID bid = new BlockID(row.getRow()); + SubMatrix b = new SubMatrix(row.get(Constants.BLOCK).getValue()); SubMatrix c = a.mult(b); - output.collect(new BlockID(key.getRow(), j), new BlockWritable(c)); + output.collect(new BlockID(key.getRow(), bid.getColumn()), + new BlockWritable(c)); } + scan.close(); } } Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java?rev=730103&r1=730102&r2=730103&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java Tue Dec 30 03:52:47 2008 @@ -19,20 +19,19 @@ */ package org.apache.hama.io; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; /** A WritableComparable for BlockIDs. */ @SuppressWarnings("unchecked") -public class BlockID implements WritableComparable, java.io.Serializable { - private static final long serialVersionUID = 6434651179475226613L; +public class BlockID implements WritableComparable { + static final Logger LOG = Logger.getLogger(BlockID.class); + public static final int PAD_SIZE = 15; private int row; private int column; @@ -44,18 +43,28 @@ } public BlockID(byte[] bytes) throws IOException { - ByteArrayInputStream bos = new ByteArrayInputStream(bytes); - ObjectInputStream oos = new ObjectInputStream(bos); - Object obj = null; + String rKey = Bytes.toString(bytes); + String keys[] = null; + if (rKey.substring(0, 8).equals("00000000")) { + int i = 8; + while (rKey.charAt(i) == '0') { + i++; + } + keys = rKey.substring(i, rKey.length()).split("[,]"); + } else { + int i = 0; + while (rKey.charAt(i) == '0') { + i++; + } + keys = rKey.substring(i, rKey.length()).split("[,]"); + } + try { - obj = oos.readObject(); - this.row = ((BlockID)obj).getRow(); - this.column = ((BlockID)obj).getColumn(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + this.row = Integer.parseInt(keys[1]); + this.column = Integer.parseInt(keys[2]); + } catch (ArrayIndexOutOfBoundsException e) { + throw new ArrayIndexOutOfBoundsException(rKey + "\n" + e); } - oos.close(); - bos.close(); } public void set(int row, int column) { @@ -72,20 +81,27 @@ } public void readFields(DataInput in) throws IOException { - column = in.readInt(); - row = in.readInt(); + BlockID value = new BlockID(Bytes.readByteArray(in)); + this.row = value.getRow(); + this.column = value.getColumn(); } public void write(DataOutput out) throws IOException { - out.writeInt(column); - out.writeInt(row); + Bytes.writeByteArray(out, Bytes.toBytes(this.toString())); } /** * Make BlockID's string representation be same format. */ public String toString() { - return row + "," + column; + int zeros = PAD_SIZE - String.valueOf(row).length() + - String.valueOf(column).length(); + StringBuffer buf = new StringBuffer(); + for (int i = 0; i < zeros; ++i) { + buf.append("0"); + } + + return buf.toString() + "," + row + "," + column; } @Override @@ -110,19 +126,14 @@ @Override public boolean equals(Object o) { - if(o == null) return false; - if(!(o instanceof BlockID)) return false; + if (o == null) + return false; + if (!(o instanceof BlockID)) + return false; return compareTo(o) == 0; } - public byte[] getBytes() throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(this); - oos.flush(); - oos.close(); - bos.close(); - byte[] data = bos.toByteArray(); - return data; + public byte[] getBytes() { + return Bytes.toBytes(this.toString()); } } Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=730103&r1=730102&r2=730103&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Tue Dec 30 03:52:47 2008 @@ -1,56 +1,77 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.io; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; -import org.apache.hama.SubMatrix; - -public class BlockWritable implements Writable { - public SubMatrix matrix; - - public BlockWritable() { - this.matrix = new SubMatrix(0, 0); - } - - public BlockWritable(SubMatrix c) { - this.matrix = c; - } - - public BlockWritable(byte[] bytes) throws IOException { - this.matrix = new SubMatrix(bytes); - } - - public void readFields(DataInput in) throws IOException { - this.matrix = new SubMatrix(Bytes.readByteArray(in)); - } - - public void write(DataOutput out) throws IOException { - Bytes.writeByteArray(out, this.matrix.getBytes()); - } - - public SubMatrix get() { - return this.matrix; - } -} +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.io; + + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.SubMatrix; + +public class BlockWritable implements Writable { + private SubMatrix matrix; + + public BlockWritable() { + this.matrix = new SubMatrix(0, 0); + } + + public BlockWritable(SubMatrix c) { + this.matrix = c; + } + + public BlockWritable(byte[] bytes) throws IOException { + this.matrix = new SubMatrix(bytes); + } + + public void readFields(DataInput in) throws IOException { + + int rows = in.readInt(); + int columns = in.readInt(); + this.matrix = new SubMatrix(rows, columns); + + for(int i = 0; i < rows; i++) { + for(int j = 0; j < columns; j++) { + this.matrix.set(i, j, in.readDouble()); + } + } + + //this.matrix = new SubMatrix(Bytes.readByteArray(in)); + } + + public void write(DataOutput out) throws IOException { + //Bytes.writeByteArray(out, this.matrix.getBytes()); + + out.writeInt(this.matrix.getRows()); + out.writeInt(this.matrix.getColumns()); + + for(int i = 0; i < this.matrix.getRows(); i++) { + for(int j = 0; j < this.matrix.getColumns(); j++) { + out.writeDouble(this.matrix.get(i, j)); + } + } + } + + public SubMatrix get() { + return this.matrix; + } +} + Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=730103&r1=730102&r2=730103&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Tue Dec 30 03:52:47 2008 @@ -1,180 +1,197 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hama.mapred; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.NullOutputFormat; -import org.apache.hama.Constants; -import org.apache.hama.DenseMatrix; -import org.apache.hama.DenseVector; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.SubMatrix; -import org.apache.hama.io.BlockID; -import org.apache.hama.io.VectorWritable; - -/** - * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix - */ -public class BlockingMapRed { - - static final Log LOG = LogFactory.getLog(BlockingMapRed.class); - /** Parameter of the path of the matrix to be blocked * */ - public static final String BLOCKING_MATRIX = "hama.blocking.matrix"; - - /** - * Initialize a job to blocking a table - * - * @param matrixPath - * @param job - */ - public static void initJob(String matrixPath, JobConf job) { - job.setMapperClass(BlockingMapper.class); - job.setReducerClass(BlockingReducer.class); - FileInputFormat.addInputPaths(job, matrixPath); - - job.setInputFormat(VectorInputFormat.class); - job.setMapOutputKeyClass(BlockID.class); - job.setMapOutputValueClass(VectorWritable.class); - job.setOutputFormat(NullOutputFormat.class); - - job.set(BLOCKING_MATRIX, matrixPath); - job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN); - } - - /** - * Abstract Blocking Map/Reduce Class to configure the job. - */ - public static abstract class BlockingMapRedBase extends MapReduceBase { - - protected DenseMatrix matrix; - protected int mBlockNum; - protected int mBlockRowSize; - protected int mBlockColSize; - - protected int mRows; - protected int mColumns; - - @Override - public void configure(JobConf job) { - try { - matrix = new DenseMatrix(new HamaConfiguration(), job.get( - BLOCKING_MATRIX, "")); - mBlockNum = matrix.getBlockSize(); - mBlockRowSize = matrix.getRows() / mBlockNum; - mBlockColSize = matrix.getColumns() / mBlockNum; - - mRows = matrix.getRows(); - mColumns = matrix.getColumns(); - } catch (IOException e) { - LOG.warn("Load matrix_blocking failed : " + e.getMessage()); - } - } - - } - - /** - * Mapper Class - */ - public static class BlockingMapper extends BlockingMapRedBase implements - Mapper { - - @Override - public void map(IntWritable key, VectorWritable value, - OutputCollector output, Reporter reporter) - throws IOException { - int startColumn; - int endColumn; - int blkRow = key.get() / mBlockRowSize; - DenseVector dv = value.getDenseVector(); - - int i = 0; - do { - startColumn = i * mBlockColSize; - endColumn = startColumn + mBlockColSize - 1; - if(endColumn >= mColumns) // the last sub vector - endColumn = mColumns - 1; - output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(), - dv.subVector(startColumn, endColumn))); - - i++; - } while(endColumn < (mColumns-1)); - } - - } - - /** - * Reducer Class - */ - public static class BlockingReducer extends BlockingMapRedBase implements - Reducer { - - @Override - public void reduce(BlockID key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { - // Note: all the sub-vectors are grouped by {@link - // org.apache.hama.io.BlockID} - - // the block's base offset in the original matrix - int colBase = key.getColumn() * mBlockColSize; - int rowBase = key.getRow() * mBlockRowSize; - - // the block's size : rows & columns - int smRows = mBlockRowSize; - if((rowBase + mBlockRowSize - 1) >= mRows) - smRows = mRows - rowBase; - int smCols = mBlockColSize; - if((colBase + mBlockColSize - 1) >= mColumns) - smCols = mColumns - colBase; - - // construct the matrix - SubMatrix subMatrix = new SubMatrix(smRows, smCols); - - // i, j is the current offset in the sub-matrix - int i = 0, j = 0; - while (values.hasNext()) { - VectorWritable vw = values.next(); - // check the size is suitable - if (vw.size() != smCols) - throw new IOException("Block Column Size dismatched."); - i = vw.row - rowBase; - if (i >= smRows || i < 0) - throw new IOException("Block Row Size dismatched."); - - // put the subVector to the subMatrix - for (j = 0; j < smCols; j++) { - subMatrix.set(i, j, vw.get(colBase + j)); - } - } - - matrix.setBlock(key.getRow(), key.getColumn(), subMatrix); - } - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hama.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hama.Constants; +import org.apache.hama.DenseMatrix; +import org.apache.hama.DenseVector; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.SubMatrix; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.VectorWritable; + +/** + * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix + */ +public class BlockingMapRed { + + static final Log LOG = LogFactory.getLog(BlockingMapRed.class); + /** Parameter of the path of the matrix to be blocked * */ + public static final String BLOCKING_MATRIX = "hama.blocking.matrix"; + public static final String BLOCKED_MATRIX = "hama.blocked.matrix"; + public static final String BLOCK_SIZE = "hama.blocking.size"; + public static final String ROWS = "hama.blocking.rows"; + public static final String COLUMNS = "hama.blocking.columns"; + + /** + * Initialize a job to blocking a table + * + * @param matrixPath + * @param string + * @param j + * @param i + * @param block_size + * @param job + */ + public static void initJob(String matrixPath, String string, int block_size, int i, int j, JobConf job) { + job.setMapperClass(BlockingMapper.class); + job.setReducerClass(BlockingReducer.class); + FileInputFormat.addInputPaths(job, matrixPath); + + job.setInputFormat(VectorInputFormat.class); + job.setMapOutputKeyClass(BlockID.class); + job.setMapOutputValueClass(VectorWritable.class); + job.setOutputFormat(NullOutputFormat.class); + + job.set(BLOCKING_MATRIX, matrixPath); + job.set(BLOCKED_MATRIX, string); + job.set(BLOCK_SIZE, String.valueOf(block_size)); + job.set(ROWS, String.valueOf(i)); + job.set(COLUMNS, String.valueOf(j)); + + job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN); + } + + /** + * Abstract Blocking Map/Reduce Class to configure the job. + */ + public static abstract class BlockingMapRedBase extends MapReduceBase { + + protected DenseMatrix matrix; + protected DenseMatrix blockedMatrix; + protected int mBlockNum; + protected int mBlockRowSize; + protected int mBlockColSize; + + protected int mRows; + protected int mColumns; + + @Override + public void configure(JobConf job) { + try { + matrix = new DenseMatrix(new HamaConfiguration(), job.get( + BLOCKING_MATRIX, "")); + blockedMatrix = new DenseMatrix(new HamaConfiguration(), job.get( + BLOCKED_MATRIX, "")); + + mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, "")); + mRows = Integer.parseInt(job.get(ROWS, "")); + mColumns = Integer.parseInt(job.get(COLUMNS, "")); + + mBlockRowSize = mRows / mBlockNum; + mBlockColSize = mColumns / mBlockNum; + } catch (IOException e) { + LOG.warn("Load matrix_blocking failed : " + e.getMessage()); + } + } + + } + + /** + * Mapper Class + */ + public static class BlockingMapper extends BlockingMapRedBase implements + Mapper { + + @Override + public void map(IntWritable key, VectorWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + int startColumn; + int endColumn; + int blkRow = key.get() / mBlockRowSize; + DenseVector dv = value.getDenseVector(); + + int i = 0; + do { + startColumn = i * mBlockColSize; + endColumn = startColumn + mBlockColSize - 1; + if(endColumn >= mColumns) // the last sub vector + endColumn = mColumns - 1; + output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(), + dv.subVector(startColumn, endColumn))); + + i++; + } while(endColumn < (mColumns-1)); + } + + } + + /** + * Reducer Class + */ + public static class BlockingReducer extends BlockingMapRedBase implements + Reducer { + + @Override + public void reduce(BlockID key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + // Note: all the sub-vectors are grouped by {@link + // org.apache.hama.io.BlockID} + + // the block's base offset in the original matrix + int colBase = key.getColumn() * mBlockColSize; + int rowBase = key.getRow() * mBlockRowSize; + + // the block's size : rows & columns + int smRows = mBlockRowSize; + if((rowBase + mBlockRowSize - 1) >= mRows) + smRows = mRows - rowBase; + int smCols = mBlockColSize; + if((colBase + mBlockColSize - 1) >= mColumns) + smCols = mColumns - colBase; + + // construct the matrix + SubMatrix subMatrix = new SubMatrix(smRows, smCols); + + // i, j is the current offset in the sub-matrix + int i = 0, j = 0; + while (values.hasNext()) { + VectorWritable vw = values.next(); + // check the size is suitable + if (vw.size() != smCols) + throw new IOException("Block Column Size dismatched."); + i = vw.row - rowBase; + if (i >= smRows || i < 0) + throw new IOException("Block Row Size dismatched."); + + // put the subVector to the subMatrix + for (j = 0; j < smCols; j++) { + subMatrix.set(i, j, vw.get(colBase + j)); + } + } + + blockedMatrix.setBlock(key.getRow(), key.getColumn(), subMatrix); + } + } + +} Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=730103&r1=730102&r2=730103&view=diff ============================================================================== --- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original) +++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Tue Dec 30 03:52:47 2008 @@ -1,357 +1,324 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama; - -import java.io.IOException; -import java.util.Iterator; - -import junit.extensions.TestSetup; -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hama.io.BlockPosition; -import org.apache.hama.io.DoubleEntry; -import org.apache.log4j.Logger; - -/** - * Matrix test - */ -public class TestDenseMatrix extends TestCase { - static final Logger LOG = Logger.getLogger(TestDenseMatrix.class); - private static int SIZE = 10; - private static Matrix m1; - private static Matrix m2; - private final static String aliase1 = "matrix_aliase_A"; - private final static String aliase2 = "matrix_aliase_B"; - private static HamaConfiguration conf; - private static HBaseAdmin admin; - private static HamaAdmin hamaAdmin; - - public static Test suite() { - TestSetup setup = new TestSetup(new TestSuite(TestDenseMatrix.class)) { - protected void setUp() throws Exception { - HCluster hCluster = new HCluster(); - hCluster.setUp(); - - conf = hCluster.getConf(); - admin = new HBaseAdmin(conf); - hamaAdmin = new HamaAdminImpl(conf, admin); - - m1 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE); - m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE); - } - - protected void tearDown() { - try { - closeTest(); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; - return setup; - } - - public static void closeTest() throws IOException { - m1.close(); - m2.close(); - } - - public void testEntryAdd() throws IOException { - double origin = m1.get(1, 1); - m1.add(1, 1, 0.5); - - assertEquals(m1.get(1, 1), origin + 0.5); - } - - public void testBlocking() throws IOException, ClassNotFoundException { - assertEquals(((DenseMatrix) m1).isBlocked(), false); - ((DenseMatrix) m1).blocking(4); - assertEquals(((DenseMatrix) m1).isBlocked(), true); - BlockPosition pos = ((DenseMatrix) m1).getBlockPosition(1, 0); - double[][] b = ((DenseMatrix) m1).subMatrix(pos.getStartRow(), - pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn()) - .getDoubleArray(); - double[][] c = ((DenseMatrix) m1).getBlock(1, 0).getDoubleArray(); - assertEquals(((DenseMatrix) m1).getBlockSize(), 2); - assertEquals(c.length, 5); - - for (int i = 0; i < b.length; i++) { - for (int j = 0; j < b.length; j++) { - assertEquals(b[i][j], c[i][j]); - } - } - } - - /** - * Map/Reduce Blocking Test - * - * @throws IOException - * @throws ClassNotFoundException - */ - public void testMRBlocking() throws IOException, ClassNotFoundException { - assertEquals(((DenseMatrix) m2).isBlocked(), false); - ((DenseMatrix) m2).blocking_mapred(4); - assertEquals(((DenseMatrix) m2).isBlocked(), true); - BlockPosition pos = ((DenseMatrix) m2).getBlockPosition(1, 0); - double[][] b = ((DenseMatrix) m2).subMatrix(pos.getStartRow(), - pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn()) - .getDoubleArray(); - double[][] c = ((DenseMatrix) m2).getBlock(1, 0).getDoubleArray(); - assertEquals(((DenseMatrix) m2).getBlockSize(), 2); - assertEquals(c.length, 5); - - for (int i = 0; i < b.length; i++) { - for (int j = 0; j < b.length; j++) { - assertEquals(b[i][j], c[i][j]); - } - } - } - - /** - * Column vector test. - * - * @param rand - * @throws IOException - */ - public void testGetColumn() throws IOException { - Vector v = m1.getColumn(0); - Iterator it = v.iterator(); - int x = 0; - while (it.hasNext()) { - assertEquals(m1.get(x, 0), it.next().getValue()); - x++; - } - } - - public void testGetSetAttribute() throws IOException { - m1.setRowLabel(0, "row1"); - assertEquals(m1.getRowLabel(0), "row1"); - assertEquals(m1.getRowLabel(1), null); - - m1.setColumnLabel(0, "column1"); - assertEquals(m1.getColumnLabel(0), "column1"); - assertEquals(m1.getColumnLabel(1), null); - } - - public void testSubMatrix() throws IOException { - SubMatrix a = m1.subMatrix(2, 4, 2, 5); // A : 3 * 4 - for (int i = 0; i < a.getRows(); i++) { - for (int j = 0; j < a.getColumns(); j++) { - assertEquals(a.get(i, j), m1.get(i + 2, j + 2)); - } - } - - SubMatrix b = m2.subMatrix(0, 3, 0, 2); // B : 4 * 3 - SubMatrix c = a.mult(b); - - double[][] C = new double[3][3]; // A * B - for (int i = 0; i < 3; i++) { - for (int j = 0; j < 3; j++) { - for (int k = 0; k < 4; k++) { - C[i][j] += m1.get(i + 2, k + 2) * m2.get(k, j); - } - } - } - - for (int i = 0; i < 3; i++) { - for (int j = 0; j < 3; j++) { - assertEquals(C[i][j], c.get(i, j)); - } - } - } - - /** - * Test matrices addition - * - * @throws IOException - */ - public void testMatrixAdd() throws IOException { - Matrix result = m1.add(m2); - - assertEquals(result.getRows(), SIZE); - assertEquals(result.getColumns(), SIZE); - - for (int i = 0; i < SIZE; i++) { - for (int j = 0; j < SIZE; j++) { - assertEquals(result.get(i, j), m1.get(i, j) + m2.get(i, j)); - } - } - } - - /** - * Test matrices multiplication - * - * @throws IOException - */ - public void testMatrixMult() throws IOException { - Matrix result = m1.mult(m2); - - assertEquals(result.getRows(), SIZE); - assertEquals(result.getColumns(), SIZE); - - verifyMultResult(m1, m2, result); - } - - public void testSetRow() throws IOException { - Vector v = new DenseVector(); - double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; - - for (int i = 0; i < SIZE; i++) { - v.set(i, entries[i]); - } - - m1.setRow(SIZE + 1, v); - Iterator it = m1.getRow(SIZE + 1).iterator(); - - int i = 0; - while (it.hasNext()) { - assertEquals(entries[i], it.next().getValue()); - i++; - } - } - - public void testSetColumn() throws IOException { - Vector v = new DenseVector(); - double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; - - for (int i = 0; i < SIZE; i++) { - v.set(i, entries[i]); - } - - m1.setColumn(SIZE + 1, v); - Iterator it = m1.getColumn(SIZE + 1).iterator(); - - int i = 0; - while (it.hasNext()) { - assertEquals(entries[i], it.next().getValue()); - i++; - } - } - - public void testLoadSave() throws IOException { - String path1 = m1.getPath(); - // save m1 to aliase1 - m1.save(aliase1); - // load matrix m1 using aliase1 - DenseMatrix loadTest = new DenseMatrix(conf, aliase1, false); - - for (int i = 0; i < loadTest.getRows(); i++) { - for (int j = 0; j < loadTest.getColumns(); j++) { - assertEquals(m1.get(i, j), loadTest.get(i, j)); - } - } - - assertEquals(path1, loadTest.getPath()); - // close loadTest, it just disconnect to the table but didn't delete it. - loadTest.close(); - - // try to close m1 & load matrix m1 using aliase1 again. - m1.close(); - DenseMatrix loadTest2 = new DenseMatrix(conf, aliase1, false); - assertEquals(path1, loadTest2.getPath()); - // remove aliase1 - // because loadTest2 connect the aliase1, so we just remove aliase entry - // but didn't delete the table. - hamaAdmin.delete(aliase1); - assertEquals(true, admin.tableExists(path1)); - // close loadTest2, because it is the last one who reference table 'path1' - // it will do the gc! - loadTest2.close(); - assertEquals(false, admin.tableExists(path1)); - - // if we try to load non-existed matrix using aliase name, it should fail. - DenseMatrix loadTest3 = null; - try { - loadTest3 = new DenseMatrix(conf, aliase1, false); - fail("Try to load a non-existed matrix should fail!"); - } catch (IOException e) { - - } finally { - if (loadTest3 != null) - loadTest3.close(); - } - } - - public void testForceCreate() throws IOException { - String path2 = m2.getPath(); - // save m2 to aliase2 - m2.save(aliase2); - // load matrix m2 using aliase2 - DenseMatrix loadTest = new DenseMatrix(conf, aliase2, false); - - for (int i = 0; i < loadTest.getRows(); i++) { - for (int j = 0; j < loadTest.getColumns(); j++) { - assertEquals(m2.get(i, j), loadTest.get(i, j)); - } - } - - assertEquals(path2, loadTest.getPath()); - - // force to create matrix loadTest2 using aliasename 'aliase2' - DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true); - String loadPath2 = loadTest2.getPath(); - assertFalse(path2.equals(loadPath2)); - assertEquals(loadPath2, hamaAdmin.getPath(aliase2)); - assertFalse(path2.equals(hamaAdmin.getPath(aliase2))); - - // try to close m2 & loadTest, it table will be deleted finally - m2.close(); - assertEquals(true, admin.tableExists(path2)); - loadTest.close(); - assertEquals(false, admin.tableExists(path2)); - - // remove 'aliase2' & close loadTest2 - loadTest2.close(); - assertEquals(true, admin.tableExists(loadPath2)); - hamaAdmin.delete(aliase2); - assertEquals(false, admin.tableExists(loadPath2)); - } - - /** - * Verifying multiplication result - * - * @param m1 - * @param m2 - * @param result - * @throws IOException - */ - private void verifyMultResult(Matrix m1, Matrix m2, Matrix result) - throws IOException { - double[][] C = new double[SIZE][SIZE]; - - for (int i = 0; i < SIZE; i++) { - for (int j = 0; j < SIZE; j++) { - for (int k = 0; k < SIZE; k++) { - C[i][k] += m1.get(i, j) * m2.get(j, k); - } - } - } - - for (int i = 0; i < SIZE; i++) { - for (int j = 0; j < SIZE; j++) { - assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String - .valueOf(C[i][j]).substring(0, 14)); - } - } - } -} +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import java.io.IOException; +import java.util.Iterator; + +import junit.extensions.TestSetup; +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hama.io.DoubleEntry; +import org.apache.log4j.Logger; + +/** + * Matrix test + */ +public class TestDenseMatrix extends TestCase { + static final Logger LOG = Logger.getLogger(TestDenseMatrix.class); + private static int SIZE = 10; + private static Matrix m1; + private static Matrix m2; + private final static String aliase1 = "matrix_aliase_A"; + private final static String aliase2 = "matrix_aliase_B"; + private static HamaConfiguration conf; + private static HBaseAdmin admin; + private static HamaAdmin hamaAdmin; + + public static Test suite() { + TestSetup setup = new TestSetup(new TestSuite(TestDenseMatrix.class)) { + protected void setUp() throws Exception { + HCluster hCluster = new HCluster(); + hCluster.setUp(); + + conf = hCluster.getConf(); + admin = new HBaseAdmin(conf); + hamaAdmin = new HamaAdminImpl(conf, admin); + + m1 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE); + m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE); + } + + protected void tearDown() { + try { + closeTest(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + return setup; + } + + public static void closeTest() throws IOException { + m1.close(); + m2.close(); + } + + public void testEntryAdd() throws IOException { + double origin = m1.get(1, 1); + m1.add(1, 1, 0.5); + + assertEquals(m1.get(1, 1), origin + 0.5); + } + + /** + * Map/Reduce Blocking Test + * + * @throws IOException + * @throws ClassNotFoundException + */ + public void testMRBlocking() throws IOException, ClassNotFoundException { + assertEquals(((DenseMatrix) m2).isBlocked(), false); + ((DenseMatrix) m2).blocking_mapred(4); + assertEquals(((DenseMatrix) m2).isBlocked(), true); + } + + /** + * Column vector test. + * + * @param rand + * @throws IOException + */ + public void testGetColumn() throws IOException { + Vector v = m1.getColumn(0); + Iterator it = v.iterator(); + int x = 0; + while (it.hasNext()) { + assertEquals(m1.get(x, 0), it.next().getValue()); + x++; + } + } + + public void testGetSetAttribute() throws IOException { + m1.setRowLabel(0, "row1"); + assertEquals(m1.getRowLabel(0), "row1"); + assertEquals(m1.getRowLabel(1), null); + + m1.setColumnLabel(0, "column1"); + assertEquals(m1.getColumnLabel(0), "column1"); + assertEquals(m1.getColumnLabel(1), null); + } + + public void testSubMatrix() throws IOException { + SubMatrix a = m1.subMatrix(2, 4, 2, 5); // A : 3 * 4 + for (int i = 0; i < a.getRows(); i++) { + for (int j = 0; j < a.getColumns(); j++) { + assertEquals(a.get(i, j), m1.get(i + 2, j + 2)); + } + } + + SubMatrix b = m2.subMatrix(0, 3, 0, 2); // B : 4 * 3 + SubMatrix c = a.mult(b); + + double[][] C = new double[3][3]; // A * B + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + for (int k = 0; k < 4; k++) { + C[i][j] += m1.get(i + 2, k + 2) * m2.get(k, j); + } + } + } + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + assertEquals(C[i][j], c.get(i, j)); + } + } + } + + /** + * Test matrices addition + * + * @throws IOException + */ + public void testMatrixAdd() throws IOException { + Matrix result = m1.add(m2); + + assertEquals(result.getRows(), SIZE); + assertEquals(result.getColumns(), SIZE); + + for (int i = 0; i < SIZE; i++) { + for (int j = 0; j < SIZE; j++) { + assertEquals(result.get(i, j), m1.get(i, j) + m2.get(i, j)); + } + } + } + + /** + * Test matrices multiplication + * + * @throws IOException + */ + public void testMatrixMult() throws IOException { + Matrix result = m1.mult(m2); + + assertEquals(result.getRows(), SIZE); + assertEquals(result.getColumns(), SIZE); + + verifyMultResult(m1, m2, result); + } + + public void testSetRow() throws IOException { + Vector v = new DenseVector(); + double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; + + for (int i = 0; i < SIZE; i++) { + v.set(i, entries[i]); + } + + m1.setRow(SIZE + 1, v); + Iterator it = m1.getRow(SIZE + 1).iterator(); + + int i = 0; + while (it.hasNext()) { + assertEquals(entries[i], it.next().getValue()); + i++; + } + } + + public void testSetColumn() throws IOException { + Vector v = new DenseVector(); + double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; + + for (int i = 0; i < SIZE; i++) { + v.set(i, entries[i]); + } + + m1.setColumn(SIZE + 1, v); + Iterator it = m1.getColumn(SIZE + 1).iterator(); + + int i = 0; + while (it.hasNext()) { + assertEquals(entries[i], it.next().getValue()); + i++; + } + } + + public void testLoadSave() throws IOException { + String path1 = m1.getPath(); + // save m1 to aliase1 + m1.save(aliase1); + // load matrix m1 using aliase1 + DenseMatrix loadTest = new DenseMatrix(conf, aliase1, false); + + for (int i = 0; i < loadTest.getRows(); i++) { + for (int j = 0; j < loadTest.getColumns(); j++) { + assertEquals(m1.get(i, j), loadTest.get(i, j)); + } + } + + assertEquals(path1, loadTest.getPath()); + // close loadTest, it just disconnect to the table but didn't delete it. + loadTest.close(); + + // try to close m1 & load matrix m1 using aliase1 again. + m1.close(); + DenseMatrix loadTest2 = new DenseMatrix(conf, aliase1, false); + assertEquals(path1, loadTest2.getPath()); + // remove aliase1 + // because loadTest2 connect the aliase1, so we just remove aliase entry + // but didn't delete the table. + hamaAdmin.delete(aliase1); + assertEquals(true, admin.tableExists(path1)); + // close loadTest2, because it is the last one who reference table 'path1' + // it will do the gc! + loadTest2.close(); + assertEquals(false, admin.tableExists(path1)); + + // if we try to load non-existed matrix using aliase name, it should fail. + DenseMatrix loadTest3 = null; + try { + loadTest3 = new DenseMatrix(conf, aliase1, false); + fail("Try to load a non-existed matrix should fail!"); + } catch (IOException e) { + + } finally { + if (loadTest3 != null) + loadTest3.close(); + } + } + + public void testForceCreate() throws IOException { + String path2 = m2.getPath(); + // save m2 to aliase2 + m2.save(aliase2); + // load matrix m2 using aliase2 + DenseMatrix loadTest = new DenseMatrix(conf, aliase2, false); + + for (int i = 0; i < loadTest.getRows(); i++) { + for (int j = 0; j < loadTest.getColumns(); j++) { + assertEquals(m2.get(i, j), loadTest.get(i, j)); + } + } + + assertEquals(path2, loadTest.getPath()); + + // force to create matrix loadTest2 using aliasename 'aliase2' + DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true); + String loadPath2 = loadTest2.getPath(); + assertFalse(path2.equals(loadPath2)); + assertEquals(loadPath2, hamaAdmin.getPath(aliase2)); + assertFalse(path2.equals(hamaAdmin.getPath(aliase2))); + + // try to close m2 & loadTest, it table will be deleted finally + m2.close(); + assertEquals(true, admin.tableExists(path2)); + loadTest.close(); + assertEquals(false, admin.tableExists(path2)); + + // remove 'aliase2' & close loadTest2 + loadTest2.close(); + assertEquals(true, admin.tableExists(loadPath2)); + hamaAdmin.delete(aliase2); + assertEquals(false, admin.tableExists(loadPath2)); + } + + /** + * Verifying multiplication result + * + * @param m1 + * @param m2 + * @param result + * @throws IOException + */ + private void verifyMultResult(Matrix m1, Matrix m2, Matrix result) + throws IOException { + double[][] C = new double[SIZE][SIZE]; + + for (int i = 0; i < SIZE; i++) { + for (int j = 0; j < SIZE; j++) { + for (int k = 0; k < SIZE; k++) { + C[i][k] += m1.get(i, j) * m2.get(j, k); + } + } + } + + for (int i = 0; i < SIZE; i++) { + for (int j = 0; j < SIZE; j++) { + assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String + .valueOf(C[i][j]).substring(0, 14)); + } + } + } +} Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=730103&r1=730102&r2=730103&view=diff ============================================================================== --- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original) +++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Tue Dec 30 03:52:47 2008 @@ -21,20 +21,13 @@ import java.io.IOException; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; import org.apache.hama.DenseMatrix; import org.apache.hama.HCluster; import org.apache.hama.Matrix; -import org.apache.hama.algebra.BlockCyclicMultiplyMap; -import org.apache.hama.algebra.BlockCyclicMultiplyReduce; -import org.apache.hama.io.BlockID; -import org.apache.hama.io.BlockWritable; import org.apache.log4j.Logger; public class TestBlockMatrixMapReduce extends HCluster { static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class); - static Matrix c; static final int SIZE = 32; /** constructor */ @@ -50,41 +43,22 @@ ((DenseMatrix) m1).blocking_mapred(16); ((DenseMatrix) m2).blocking_mapred(16); - miniMRJob(m1.getPath(), m2.getPath()); + Matrix c = m1.mult(m2); - double[][] C = new double[SIZE][SIZE]; + double[][] mem = new double[SIZE][SIZE]; for (int i = 0; i < SIZE; i++) { for (int j = 0; j < SIZE; j++) { for (int k = 0; k < SIZE; k++) { - C[i][k] += m1.get(i, j) * m2.get(j, k); + mem[i][k] += m1.get(i, j) * m2.get(j, k); } } } for (int i = 0; i < SIZE; i++) { for (int j = 0; j < SIZE; j++) { - assertEquals(String.valueOf(C[i][j]).substring(0, 5), String.valueOf( + assertEquals(String.valueOf(mem[i][j]).substring(0, 5), String.valueOf( c.get(i, j)).substring(0, 5)); } } } - - private void miniMRJob(String string, String string2) throws IOException { - c = new DenseMatrix(conf); - String output = c.getPath(); - - JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class); - jobConf.setJobName("test MR job"); - - BlockCyclicMultiplyMap.initJob(string, string2, - BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class, - jobConf); - BlockCyclicMultiplyReduce.initJob(output, BlockCyclicMultiplyReduce.class, - jobConf); - - jobConf.setNumMapTasks(2); - jobConf.setNumReduceTasks(2); - - JobClient.runJob(jobConf); - } }