incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r743614 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/algebra/ src/java/org/apache/hama/mapred/ src/test/org/apache/hama/examples/
Date Thu, 12 Feb 2009 04:20:50 GMT
Author: edwardyoon
Date: Thu Feb 12 04:20:49 2009
New Revision: 743614

URL: http://svn.apache.org/viewvc?rev=743614&view=rev
Log:
Add multiplication example of file matrices

Added:
    incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java
    incubator/hama/trunk/src/test/org/apache/hama/examples/
    incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksBase.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=743614&r1=743613&r2=743614&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Feb 12 04:20:49 2009
@@ -4,6 +4,7 @@
 
   NEW FEATURES
   
+    HAMA-151: Add multiplication example of file matrices (edwardyoon)
     HAMA-145: Add privacy policy page (edwardyoon)
     HAMA-83: 2D sqaure blocking for dense matrix multiplication (edwardyoon)
     HAMA-104: Add getNumMap/reduceTasks to HamaConfiguration (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java?rev=743614&r1=743613&r2=743614&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java Thu Feb
12 04:20:49 2009
@@ -29,6 +29,7 @@
       pgd.addClass("random", RandomMatrix.class, "Generate matrix with random elements.");
       pgd.addClass("add", MatrixAddition.class, "Mat-Mat addition.");
       pgd.addClass("mult", MatrixMultiplication.class, "Mat-Mat multiplication.");
+      pgd.addClass("multfiles", MatrixMultiplication.class, "file matrices multiplication.");
       pgd.driver(args);
     } catch (Throwable e) {
       e.printStackTrace();

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java?rev=743614&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java Thu
Feb 12 04:20:49 2009
@@ -0,0 +1,155 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.DenseVector;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.Matrix;
+import org.apache.hama.algebra.BlockMultiplyMap;
+import org.apache.hama.algebra.BlockMultiplyReduce;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorWritable;
+import org.apache.hama.mapred.CollectBlocksMap;
+import org.apache.hama.mapred.CollectBlocksMapReduceBase;
+import org.apache.hama.util.JobManager;
+
+public class FileMatrixBlockMult extends AbstractExample {
+  final static Log LOG = LogFactory.getLog(FileMatrixBlockMult.class.getName());
+  private static int BLOCKSIZE;
+  private static int ROWS;
+  private static int COLUMNS;
+
+  /**
+   * Collect blocks from sequence file,
+   */
+  public static class MyMapper extends CollectBlocksMapReduceBase implements
+      CollectBlocksMap<IntWritable, MapWritable> {
+    private MapWritable value;
+
+    @Override
+    public void map(IntWritable key, MapWritable value,
+        OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
+        throws IOException {
+      int startColumn;
+      int endColumn;
+      int blkRow = key.get() / mBlockRowSize;
+      this.value = value;
+
+      int i = 0;
+      do {
+        startColumn = i * mBlockColSize;
+        endColumn = startColumn + mBlockColSize - 1;
+        if (endColumn >= mColumns) // the last sub vector
+          endColumn = mColumns - 1;
+        output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
+            subVector(startColumn, endColumn)));
+
+        i++;
+      } while (endColumn < (mColumns - 1));
+    }
+
+    private DenseVector subVector(int i0, int i1) {
+      DenseVector res = new DenseVector();
+      for (int i = i0; i <= i1; i++) {
+        res.set(i, ((DoubleWritable) this.value.get(new IntWritable(i))).get());
+      }
+
+      return res;
+    }
+  }
+
+  /**
+   * @param a the path of matrix A
+   * @param b the path of matrix B
+   * @return the result C
+   * @throws IOException
+   */
+  private static DenseMatrix matMult(Path a, Path b) throws IOException {
+    HamaConfiguration conf = new HamaConfiguration();
+    Matrix collectionTable = new DenseMatrix(conf);
+
+    collectBlocksFromFile(a, true, collectionTable.getPath(), conf);
+    collectBlocksFromFile(b, false, collectionTable.getPath(), conf);
+
+    DenseMatrix result = new DenseMatrix(conf);
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setJobName("multiplication MR job : " + result.getPath());
+
+    BlockMultiplyMap.initJob(collectionTable.getPath(), BlockMultiplyMap.class,
+        BlockID.class, BlockWritable.class, jobConf);
+    BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class,
+        jobConf);
+
+    JobManager.execute(jobConf, result);
+
+    return result;
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 5) {
+      System.out
+          .println("multfiles  [-m maps] [-r reduces] <seqfile1> <seqfile1> <blocks>
<rows> <columns>");
+      System.exit(-1);
+    } else {
+      parseArgs(args);
+    }
+
+    Path a = new Path(ARGS.get(0));
+    Path b = new Path(ARGS.get(1));
+
+    BLOCKSIZE = Integer.parseInt(ARGS.get(2));
+    // You should know dimensions
+    ROWS = Integer.parseInt(ARGS.get(3));
+    COLUMNS = Integer.parseInt(ARGS.get(4));
+
+    DenseMatrix result = matMult(a, b);
+    System.out.println("result: " + result.getRows() + " by "
+        + result.getColumns());
+  }
+
+  private static void collectBlocksFromFile(Path path, boolean b,
+      String collectionTable, HamaConfiguration conf) throws IOException {
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setJobName("Blocking MR job" + path);
+
+    jobConf.setMapperClass(MyMapper.class);
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    FileInputFormat.addInputPath(jobConf, path);
+
+    MyMapper.initJob(collectionTable, b, BLOCKSIZE, ROWS, COLUMNS, jobConf);
+    JobClient.runJob(jobConf);
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java?rev=743614&r1=743613&r2=743614&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java Thu Feb 12
04:20:49 2009
@@ -56,7 +56,6 @@
   public void map(BlockID key, BlockWritable value,
       OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
       throws IOException {
-    
     SubMatrix c = value.get(0).mult(value.get(1));
     output.collect(key, new BlockWritable(c));
   }

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java?rev=743614&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java Thu
Feb 12 04:20:49 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.mapred;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorWritable;
+
+/**
+ * Abstract Blocking Map/Reduce Class to configure the job.
+ */
+public abstract class CollectBlocksMapReduceBase extends MapReduceBase {
+  /** Parameter of the path of the matrix to be blocked * */
+  public static final String BLOCK_SIZE = "hama.blocking.size";
+  public static final String ROWS = "hama.blocking.rows";
+  public static final String COLUMNS = "hama.blocking.columns";
+  public static final String MATRIX_POS = "a.ore.b";
+
+  protected int mBlockNum;
+  protected int mBlockRowSize;
+  protected int mBlockColSize;
+  protected int mRows;
+  protected int mColumns;
+  protected boolean matrixPos;
+
+  @Override
+  public void configure(JobConf job) {
+    mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, ""));
+    mRows = Integer.parseInt(job.get(ROWS, ""));
+    mColumns = Integer.parseInt(job.get(COLUMNS, ""));
+
+    mBlockRowSize = mRows / mBlockNum;
+    mBlockColSize = mColumns / mBlockNum;
+
+    matrixPos = job.getBoolean(MATRIX_POS, true);
+  }
+
+  /**
+   * Initialize a job to blocking a table
+   */
+  @SuppressWarnings("unchecked")
+  public static void initJob(String collectionTable, boolean bool,
+      int block_size, int i, int j, JobConf job) {
+    job.setReducerClass(CollectBlocksReducer.class);
+    job.setMapOutputKeyClass(BlockID.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputFormat(BlockOutputFormat.class);
+    job.setOutputKeyClass(BlockID.class);
+    job.setOutputValueClass(BlockWritable.class);
+    job.set(BLOCK_SIZE, String.valueOf(block_size));
+    job.set(ROWS, String.valueOf(i));
+    job.set(COLUMNS, String.valueOf(j));
+    job.setBoolean(MATRIX_POS, bool);
+    job.set(BlockOutputFormat.OUTPUT_TABLE, collectionTable);
+
+    if (bool)
+      job.set(BlockOutputFormat.COLUMN, "a");
+    else
+      job.set(BlockOutputFormat.COLUMN, "b");
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java?rev=743614&r1=743613&r2=743614&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java Thu Feb
12 04:20:49 2009
@@ -28,7 +28,7 @@
 /**
  * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
  */
-public class CollectBlocksMapper extends CollectBlocksBase implements
+public class CollectBlocksMapper extends CollectBlocksMapReduceBase implements
     CollectBlocksMap<IntWritable, VectorWritable> {
 
   @Override

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java?rev=743614&r1=743613&r2=743614&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java Thu Feb
12 04:20:49 2009
@@ -32,7 +32,7 @@
  * to avoid duplicated records. Each row has a two sub matrices of a(i, k) and
  * b(k, j).
  */
-public class CollectBlocksReducer extends CollectBlocksBase implements
+public class CollectBlocksReducer extends CollectBlocksMapReduceBase implements
     Reducer<BlockID, VectorWritable, BlockID, BlockWritable> {
 
   @Override

Added: incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java?rev=743614&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java Thu
Feb 12 04:20:49 2009
@@ -0,0 +1,207 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.DenseVector;
+import org.apache.hama.HCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.Matrix;
+import org.apache.hama.algebra.BlockMultiplyMap;
+import org.apache.hama.algebra.BlockMultiplyReduce;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorWritable;
+import org.apache.hama.mapred.CollectBlocksMap;
+import org.apache.hama.mapred.CollectBlocksMapReduceBase;
+import org.apache.hama.util.JobManager;
+
+public class TestFileMatrixBlockMult extends TestCase {
+  final static Log LOG = LogFactory.getLog(TestFileMatrixBlockMult.class
+      .getName());
+  private static HamaConfiguration conf;
+  private static Path[] path = new Path[2];
+  private static Matrix collectionTable;
+
+  public static Test suite() {
+    TestSetup setup = new TestSetup(
+        new TestSuite(TestFileMatrixBlockMult.class)) {
+      protected void setUp() throws Exception {
+        HCluster hCluster = new HCluster();
+        hCluster.setUp();
+
+        conf = hCluster.getConf();
+        collectionTable = new DenseMatrix(conf);
+      }
+
+      protected void tearDown() {
+      }
+    };
+    return setup;
+  }
+
+  public void testCreateFiles() throws IOException {
+    Configuration conf = new Configuration();
+    LocalFileSystem fs = new LocalFileSystem();
+    fs.setConf(conf);
+    fs.getRawFileSystem().setConf(conf);
+
+    for (int i = 0; i < 2; i++) {
+      path[i] = new Path(System.getProperty("test.build.data", ".") + "/test"
+          + i + ".seq");
+      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path[i],
+          IntWritable.class, MapWritable.class, CompressionType.BLOCK);
+
+      MapWritable value = new MapWritable();
+      value.put(new IntWritable(0), new DoubleWritable(0.5));
+      value.put(new IntWritable(1), new DoubleWritable(0.1));
+      value.put(new IntWritable(2), new DoubleWritable(0.5));
+      value.put(new IntWritable(3), new DoubleWritable(0.1));
+
+      writer.append(new IntWritable(0), value);
+      writer.append(new IntWritable(1), value);
+      writer.append(new IntWritable(2), value);
+      writer.append(new IntWritable(3), value);
+
+      writer.close();
+    }
+
+    SequenceFile.Reader reader1 = new SequenceFile.Reader(fs, path[0], conf);
+    // read first value from reader1
+    IntWritable key = new IntWritable();
+    MapWritable val = new MapWritable();
+    reader1.next(key, val);
+
+    assertEquals(key.get(), 0);
+  }
+
+  public void testFileMatrixMult() throws IOException {
+    collectBlocksFromFile(path[0], true, collectionTable.getPath(), conf);
+    collectBlocksFromFile(path[1], false, collectionTable.getPath(), conf);
+
+    Matrix result = new DenseMatrix(conf);
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setJobName("multiplication MR job : " + result.getPath());
+
+    BlockMultiplyMap.initJob(collectionTable.getPath(), BlockMultiplyMap.class,
+        BlockID.class, BlockWritable.class, jobConf);
+    BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class,
+        jobConf);
+
+    JobManager.execute(jobConf, result);
+
+    verifyMultResult(result);
+  }
+
+  private void verifyMultResult(Matrix result) throws IOException {
+    double[][] a = new double[][] { { 0.5, 0.1, 0.5, 0.1 },
+        { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 } };
+    double[][] b = new double[][] { { 0.5, 0.1, 0.5, 0.1 },
+        { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 } };
+    double[][] c = new double[4][4];
+
+    for (int i = 0; i < 4; i++) {
+      for (int j = 0; j < 4; j++) {
+        for (int k = 0; k < 4; k++) {
+          c[i][k] += a[i][j] * b[j][k];
+        }
+      }
+    }
+
+    for (int i = 0; i < result.getRows(); i++) {
+      for (int j = 0; j < result.getColumns(); j++) {
+        double gap = (c[i][j] - result.get(i, j));
+        assertTrue(gap < 0.000001 || gap < -0.000001);
+      }
+    }    
+  }
+
+  private static void collectBlocksFromFile(Path path, boolean b,
+      String collectionTable, HamaConfiguration conf) throws IOException {
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setJobName("Blocking MR job" + path);
+
+    jobConf.setMapperClass(MyMapper.class);
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    FileInputFormat.addInputPath(jobConf, path);
+
+    MyMapper.initJob(collectionTable, b, 2, 4, 4, jobConf);
+    JobClient.runJob(jobConf);
+  }
+
+  public static class MyMapper extends CollectBlocksMapReduceBase implements
+      CollectBlocksMap<IntWritable, MapWritable> {
+    private MapWritable value;
+
+    @Override
+    public void map(IntWritable key, MapWritable value,
+        OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
+        throws IOException {
+      int startColumn;
+      int endColumn;
+      int blkRow = key.get() / mBlockRowSize;
+      this.value = value;
+
+      int i = 0;
+      do {
+        startColumn = i * mBlockColSize;
+        endColumn = startColumn + mBlockColSize - 1;
+        if (endColumn >= mColumns) // the last sub vector
+          endColumn = mColumns - 1;
+        output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
+            subVector(startColumn, endColumn)));
+
+        i++;
+      } while (endColumn < (mColumns - 1));
+    }
+
+    private DenseVector subVector(int i0, int i1) {
+      DenseVector res = new DenseVector();
+      for (int i = i0; i <= i1; i++) {
+        res.set(i, ((DoubleWritable) this.value.get(new IntWritable(i))).get());
+      }
+
+      return res;
+    }
+  }
+}



Mime
View raw message