Return-Path: Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: (qmail 38314 invoked from network); 12 Feb 2009 04:21:15 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Feb 2009 04:21:15 -0000 Received: (qmail 22214 invoked by uid 500); 12 Feb 2009 04:21:15 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 22188 invoked by uid 500); 12 Feb 2009 04:21:15 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@ Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 22177 invoked by uid 99); 12 Feb 2009 04:21:15 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2009 20:21:15 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Feb 2009 04:21:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CC29523888F1; Thu, 12 Feb 2009 04:20:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r743614 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/algebra/ src/java/org/apache/hama/mapred/ src/test/org/apache/hama/examples/ Date: Thu, 12 Feb 2009 04:20:50 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090212042050.CC29523888F1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: edwardyoon Date: Thu Feb 12 04:20:49 2009 New Revision: 743614 URL: http://svn.apache.org/viewvc?rev=743614&view=rev Log: Add multiplication example of file matrices Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java incubator/hama/trunk/src/test/org/apache/hama/examples/ incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java Removed: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=743614&r1=743613&r2=743614&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Thu Feb 12 04:20:49 2009 @@ -4,6 +4,7 @@ NEW FEATURES + HAMA-151: Add multiplication example of file matrices (edwardyoon) HAMA-145: Add privacy policy page (edwardyoon) HAMA-83: 2D sqaure blocking for dense matrix multiplication (edwardyoon) HAMA-104: Add getNumMap/reduceTasks to HamaConfiguration (edwardyoon) Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java?rev=743614&r1=743613&r2=743614&view=diff ============================================================================== --- incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java (original) +++ incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java Thu Feb 12 04:20:49 2009 @@ -29,6 +29,7 @@ pgd.addClass("random", RandomMatrix.class, "Generate matrix with random elements."); pgd.addClass("add", MatrixAddition.class, "Mat-Mat addition."); pgd.addClass("mult", MatrixMultiplication.class, "Mat-Mat multiplication."); + pgd.addClass("multfiles", MatrixMultiplication.class, "file matrices multiplication."); pgd.driver(args); } catch (Throwable e) { e.printStackTrace(); Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java?rev=743614&view=auto ============================================================================== --- incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java (added) +++ incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java Thu Feb 12 04:20:49 2009 @@ -0,0 +1,155 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hama.DenseMatrix; +import org.apache.hama.DenseVector; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.Matrix; +import org.apache.hama.algebra.BlockMultiplyMap; +import org.apache.hama.algebra.BlockMultiplyReduce; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.BlockWritable; +import org.apache.hama.io.VectorWritable; +import org.apache.hama.mapred.CollectBlocksMap; +import org.apache.hama.mapred.CollectBlocksMapReduceBase; +import org.apache.hama.util.JobManager; + +public class FileMatrixBlockMult extends AbstractExample { + final static Log LOG = LogFactory.getLog(FileMatrixBlockMult.class.getName()); + private static int BLOCKSIZE; + private static int ROWS; + private static int COLUMNS; + + /** + * Collect blocks from sequence file, + */ + public static class MyMapper extends CollectBlocksMapReduceBase implements + CollectBlocksMap { + private MapWritable value; + + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + int startColumn; + int endColumn; + int blkRow = key.get() / mBlockRowSize; + this.value = value; + + int i = 0; + do { + startColumn = i * mBlockColSize; + endColumn = startColumn + mBlockColSize - 1; + if (endColumn >= mColumns) // the last sub vector + endColumn = mColumns - 1; + output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(), + subVector(startColumn, endColumn))); + + i++; + } while (endColumn < (mColumns - 1)); + } + + private DenseVector subVector(int i0, int i1) { + DenseVector res = new DenseVector(); + for (int i = i0; i <= i1; i++) { + res.set(i, ((DoubleWritable) this.value.get(new IntWritable(i))).get()); + } + + return res; + } + } + + /** + * @param a the path of matrix A + * @param b the path of matrix B + * @return the result C + * @throws IOException + */ + private static DenseMatrix matMult(Path a, Path b) throws IOException { + HamaConfiguration conf = new HamaConfiguration(); + Matrix collectionTable = new DenseMatrix(conf); + + collectBlocksFromFile(a, true, collectionTable.getPath(), conf); + collectBlocksFromFile(b, false, collectionTable.getPath(), conf); + + DenseMatrix result = new DenseMatrix(conf); + JobConf jobConf = new JobConf(conf); + jobConf.setJobName("multiplication MR job : " + result.getPath()); + + BlockMultiplyMap.initJob(collectionTable.getPath(), BlockMultiplyMap.class, + BlockID.class, BlockWritable.class, jobConf); + BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class, + jobConf); + + JobManager.execute(jobConf, result); + + return result; + } + + public static void main(String[] args) throws IOException { + if (args.length < 5) { + System.out + .println("multfiles [-m maps] [-r reduces] "); + System.exit(-1); + } else { + parseArgs(args); + } + + Path a = new Path(ARGS.get(0)); + Path b = new Path(ARGS.get(1)); + + BLOCKSIZE = Integer.parseInt(ARGS.get(2)); + // You should know dimensions + ROWS = Integer.parseInt(ARGS.get(3)); + COLUMNS = Integer.parseInt(ARGS.get(4)); + + DenseMatrix result = matMult(a, b); + System.out.println("result: " + result.getRows() + " by " + + result.getColumns()); + } + + private static void collectBlocksFromFile(Path path, boolean b, + String collectionTable, HamaConfiguration conf) throws IOException { + JobConf jobConf = new JobConf(conf); + jobConf.setJobName("Blocking MR job" + path); + + jobConf.setMapperClass(MyMapper.class); + jobConf.setInputFormat(SequenceFileInputFormat.class); + FileInputFormat.addInputPath(jobConf, path); + + MyMapper.initJob(collectionTable, b, BLOCKSIZE, ROWS, COLUMNS, jobConf); + JobClient.runJob(jobConf); + } +} Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java?rev=743614&r1=743613&r2=743614&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java Thu Feb 12 04:20:49 2009 @@ -56,7 +56,6 @@ public void map(BlockID key, BlockWritable value, OutputCollector output, Reporter reporter) throws IOException { - SubMatrix c = value.get(0).mult(value.get(1)); output.collect(key, new BlockWritable(c)); } Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java?rev=743614&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java Thu Feb 12 04:20:49 2009 @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hama.mapred; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.BlockWritable; +import org.apache.hama.io.VectorWritable; + +/** + * Abstract Blocking Map/Reduce Class to configure the job. + */ +public abstract class CollectBlocksMapReduceBase extends MapReduceBase { + /** Parameter of the path of the matrix to be blocked * */ + public static final String BLOCK_SIZE = "hama.blocking.size"; + public static final String ROWS = "hama.blocking.rows"; + public static final String COLUMNS = "hama.blocking.columns"; + public static final String MATRIX_POS = "a.ore.b"; + + protected int mBlockNum; + protected int mBlockRowSize; + protected int mBlockColSize; + protected int mRows; + protected int mColumns; + protected boolean matrixPos; + + @Override + public void configure(JobConf job) { + mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, "")); + mRows = Integer.parseInt(job.get(ROWS, "")); + mColumns = Integer.parseInt(job.get(COLUMNS, "")); + + mBlockRowSize = mRows / mBlockNum; + mBlockColSize = mColumns / mBlockNum; + + matrixPos = job.getBoolean(MATRIX_POS, true); + } + + /** + * Initialize a job to blocking a table + */ + @SuppressWarnings("unchecked") + public static void initJob(String collectionTable, boolean bool, + int block_size, int i, int j, JobConf job) { + job.setReducerClass(CollectBlocksReducer.class); + job.setMapOutputKeyClass(BlockID.class); + job.setMapOutputValueClass(VectorWritable.class); + + job.setOutputFormat(BlockOutputFormat.class); + job.setOutputKeyClass(BlockID.class); + job.setOutputValueClass(BlockWritable.class); + job.set(BLOCK_SIZE, String.valueOf(block_size)); + job.set(ROWS, String.valueOf(i)); + job.set(COLUMNS, String.valueOf(j)); + job.setBoolean(MATRIX_POS, bool); + job.set(BlockOutputFormat.OUTPUT_TABLE, collectionTable); + + if (bool) + job.set(BlockOutputFormat.COLUMN, "a"); + else + job.set(BlockOutputFormat.COLUMN, "b"); + } +} Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java?rev=743614&r1=743613&r2=743614&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java Thu Feb 12 04:20:49 2009 @@ -28,7 +28,7 @@ /** * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix */ -public class CollectBlocksMapper extends CollectBlocksBase implements +public class CollectBlocksMapper extends CollectBlocksMapReduceBase implements CollectBlocksMap { @Override Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java?rev=743614&r1=743613&r2=743614&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java Thu Feb 12 04:20:49 2009 @@ -32,7 +32,7 @@ * to avoid duplicated records. Each row has a two sub matrices of a(i, k) and * b(k, j). */ -public class CollectBlocksReducer extends CollectBlocksBase implements +public class CollectBlocksReducer extends CollectBlocksMapReduceBase implements Reducer { @Override Added: incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java?rev=743614&view=auto ============================================================================== --- incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java (added) +++ incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java Thu Feb 12 04:20:49 2009 @@ -0,0 +1,207 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.IOException; + +import junit.extensions.TestSetup; +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hama.DenseMatrix; +import org.apache.hama.DenseVector; +import org.apache.hama.HCluster; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.Matrix; +import org.apache.hama.algebra.BlockMultiplyMap; +import org.apache.hama.algebra.BlockMultiplyReduce; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.BlockWritable; +import org.apache.hama.io.VectorWritable; +import org.apache.hama.mapred.CollectBlocksMap; +import org.apache.hama.mapred.CollectBlocksMapReduceBase; +import org.apache.hama.util.JobManager; + +public class TestFileMatrixBlockMult extends TestCase { + final static Log LOG = LogFactory.getLog(TestFileMatrixBlockMult.class + .getName()); + private static HamaConfiguration conf; + private static Path[] path = new Path[2]; + private static Matrix collectionTable; + + public static Test suite() { + TestSetup setup = new TestSetup( + new TestSuite(TestFileMatrixBlockMult.class)) { + protected void setUp() throws Exception { + HCluster hCluster = new HCluster(); + hCluster.setUp(); + + conf = hCluster.getConf(); + collectionTable = new DenseMatrix(conf); + } + + protected void tearDown() { + } + }; + return setup; + } + + public void testCreateFiles() throws IOException { + Configuration conf = new Configuration(); + LocalFileSystem fs = new LocalFileSystem(); + fs.setConf(conf); + fs.getRawFileSystem().setConf(conf); + + for (int i = 0; i < 2; i++) { + path[i] = new Path(System.getProperty("test.build.data", ".") + "/test" + + i + ".seq"); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path[i], + IntWritable.class, MapWritable.class, CompressionType.BLOCK); + + MapWritable value = new MapWritable(); + value.put(new IntWritable(0), new DoubleWritable(0.5)); + value.put(new IntWritable(1), new DoubleWritable(0.1)); + value.put(new IntWritable(2), new DoubleWritable(0.5)); + value.put(new IntWritable(3), new DoubleWritable(0.1)); + + writer.append(new IntWritable(0), value); + writer.append(new IntWritable(1), value); + writer.append(new IntWritable(2), value); + writer.append(new IntWritable(3), value); + + writer.close(); + } + + SequenceFile.Reader reader1 = new SequenceFile.Reader(fs, path[0], conf); + // read first value from reader1 + IntWritable key = new IntWritable(); + MapWritable val = new MapWritable(); + reader1.next(key, val); + + assertEquals(key.get(), 0); + } + + public void testFileMatrixMult() throws IOException { + collectBlocksFromFile(path[0], true, collectionTable.getPath(), conf); + collectBlocksFromFile(path[1], false, collectionTable.getPath(), conf); + + Matrix result = new DenseMatrix(conf); + JobConf jobConf = new JobConf(conf); + jobConf.setJobName("multiplication MR job : " + result.getPath()); + + BlockMultiplyMap.initJob(collectionTable.getPath(), BlockMultiplyMap.class, + BlockID.class, BlockWritable.class, jobConf); + BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class, + jobConf); + + JobManager.execute(jobConf, result); + + verifyMultResult(result); + } + + private void verifyMultResult(Matrix result) throws IOException { + double[][] a = new double[][] { { 0.5, 0.1, 0.5, 0.1 }, + { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 } }; + double[][] b = new double[][] { { 0.5, 0.1, 0.5, 0.1 }, + { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 } }; + double[][] c = new double[4][4]; + + for (int i = 0; i < 4; i++) { + for (int j = 0; j < 4; j++) { + for (int k = 0; k < 4; k++) { + c[i][k] += a[i][j] * b[j][k]; + } + } + } + + for (int i = 0; i < result.getRows(); i++) { + for (int j = 0; j < result.getColumns(); j++) { + double gap = (c[i][j] - result.get(i, j)); + assertTrue(gap < 0.000001 || gap < -0.000001); + } + } + } + + private static void collectBlocksFromFile(Path path, boolean b, + String collectionTable, HamaConfiguration conf) throws IOException { + JobConf jobConf = new JobConf(conf); + jobConf.setJobName("Blocking MR job" + path); + + jobConf.setMapperClass(MyMapper.class); + jobConf.setInputFormat(SequenceFileInputFormat.class); + FileInputFormat.addInputPath(jobConf, path); + + MyMapper.initJob(collectionTable, b, 2, 4, 4, jobConf); + JobClient.runJob(jobConf); + } + + public static class MyMapper extends CollectBlocksMapReduceBase implements + CollectBlocksMap { + private MapWritable value; + + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector output, Reporter reporter) + throws IOException { + int startColumn; + int endColumn; + int blkRow = key.get() / mBlockRowSize; + this.value = value; + + int i = 0; + do { + startColumn = i * mBlockColSize; + endColumn = startColumn + mBlockColSize - 1; + if (endColumn >= mColumns) // the last sub vector + endColumn = mColumns - 1; + output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(), + subVector(startColumn, endColumn))); + + i++; + } while (endColumn < (mColumns - 1)); + } + + private DenseVector subVector(int i0, int i1) { + DenseVector res = new DenseVector(); + for (int i = i0; i <= i1; i++) { + res.set(i, ((DoubleWritable) this.value.get(new IntWritable(i))).get()); + } + + return res; + } + } +}