incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r830058 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/matrix/ src/java/org/apache/hama/matrix/algebra/ src/test/org/apache/hama/matrix/
Date Tue, 27 Oct 2009 04:55:42 GMT
Author: edwardyoon
Date: Tue Oct 27 04:55:42 2009
New Revision: 830058

URL: http://svn.apache.org/viewvc?rev=830058&view=rev
Log:
Replacement of vector-matrix multiplication Map/Reduce

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultMap.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultReduce.java
    incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseMatrix.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=830058&r1=830057&r2=830058&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Oct 27 04:55:42 2009
@@ -42,6 +42,7 @@
 
   IMPROVEMENTS
 
+    HAMA-208: Replacement of vector-matrix multiplication Map/Reduce (edwardyoon)
     HAMA-216: Removing JobManager in util package (edwardyoon)
     HAMA-215: Removing hama shell from version 0.2 (edwardyoon)
     HAMA-204: Replacement of TransposeMap/Reduce (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java?rev=830058&r1=830057&r2=830058&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java Tue Oct 27 04:55:42
2009
@@ -20,7 +20,7 @@
 package org.apache.hama.matrix;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -608,6 +608,18 @@
     return result;
   }
 
+  public boolean checkAllJobs(List<Job> jobId) throws IOException {
+    Iterator<Job> it = jobId.iterator();
+    boolean allTrue = true;
+    while (it.hasNext()) {
+      if (!it.next().isComplete()) {
+        allTrue = false;
+      }
+    }
+
+    return allTrue;
+  }
+  
   public boolean save(String aliasename) throws IOException {
     // mark & update the aliase name in "alise:name" meta column.
     // ! one matrix has only one aliasename now.

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java?rev=830058&r1=830057&r2=830058&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java Tue Oct 27 04:55:42
2009
@@ -20,7 +20,9 @@
 package org.apache.hama.matrix;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +35,7 @@
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DoubleWritable;
@@ -527,23 +530,42 @@
       columns = this.getColumns();
 
     DenseMatrix result = new DenseMatrix(config, this.getRows(), columns);
-
+    List<Job> jobId = new ArrayList<Job>();
+    
     for (int i = 0; i < this.getRows(); i++) {
-      JobConf jobConf = new JobConf(config);
-      jobConf.setJobName("multiplication MR job : " + result.getPath() + " "
-          + i);
-
-      jobConf.setNumMapTasks(config.getNumMapTasks());
-      jobConf.setNumReduceTasks(config.getNumReduceTasks());
-
-      DenseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(),
-          DenseMatrixVectorMultMap.class, IntWritable.class, MapWritable.class,
-          jobConf);
-      DenseMatrixVectorMultReduce.initJob(result.getPath(),
-          DenseMatrixVectorMultReduce.class, jobConf);
-      JobClient.runJob(jobConf);
+      Job job = new Job(config, "multiplication MR job : " + result.getPath()
+          + " " + i);
+
+      Scan scan = new Scan();
+      scan.addFamily(Constants.COLUMNFAMILY);
+      job.getConfiguration().set(DenseMatrixVectorMultMap.MATRIX_A,
+          this.getPath());
+      job.getConfiguration().setInt(DenseMatrixVectorMultMap.ITH_ROW, i);
+
+      TableMapReduceUtil.initTableMapperJob(B.getPath(), scan,
+          DenseMatrixVectorMultMap.class, IntWritable.class,
+          MapWritable.class, job);
+      TableMapReduceUtil.initTableReducerJob(result.getPath(),
+          DenseMatrixVectorMultReduce.class, job);
+      try {
+        job.waitForCompletion(false);
+        jobId.add(job);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ClassNotFoundException e) {
+        e.printStackTrace();
+      }
     }
 
+    while (checkAllJobs(jobId) == false) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+    
     return result;
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseMatrix.java?rev=830058&r1=830057&r2=830058&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseMatrix.java Tue Oct 27 04:55:42
2009
@@ -20,6 +20,8 @@
 package org.apache.hama.matrix;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -27,7 +29,9 @@
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IntWritable;
@@ -35,8 +39,6 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -249,20 +251,40 @@
     SparseMatrix result = new SparseMatrix(config, this.getRows(), this
         .getColumns());
 
+    List<Job> jobId = new ArrayList<Job>();
+
     for (int i = 0; i < this.getRows(); i++) {
-      JobConf jobConf = new JobConf(config);
-      jobConf.setJobName("multiplication MR job : " + result.getPath() + " "
-          + i);
+      Job job = new Job(config, "multiplication MR job : " + result.getPath()
+          + " " + i);
 
-      jobConf.setNumMapTasks(config.getNumMapTasks());
-      jobConf.setNumReduceTasks(config.getNumReduceTasks());
+      Scan scan = new Scan();
+      scan.addFamily(Constants.COLUMNFAMILY);
+      job.getConfiguration().set(SparseMatrixVectorMultMap.MATRIX_A,
+          this.getPath());
+      job.getConfiguration().setInt(SparseMatrixVectorMultMap.ITH_ROW, i);
 
-      SparseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(),
+      TableMapReduceUtil.initTableMapperJob(B.getPath(), scan,
           SparseMatrixVectorMultMap.class, IntWritable.class,
-          MapWritable.class, jobConf);
-      SparseMatrixVectorMultReduce.initJob(result.getPath(),
-          SparseMatrixVectorMultReduce.class, jobConf);
-      JobClient.runJob(jobConf);
+          MapWritable.class, job);
+      TableMapReduceUtil.initTableReducerJob(result.getPath(),
+          SparseMatrixVectorMultReduce.class, job);
+      try {
+        job.waitForCompletion(false);
+        jobId.add(job);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ClassNotFoundException e) {
+        e.printStackTrace();
+      }
+    }
+
+    while (checkAllJobs(jobId) == false) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
     }
 
     return result;

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java?rev=830058&r1=830057&r2=830058&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java
(original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java
Tue Oct 27 04:55:42 2009
@@ -21,65 +21,53 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.mapred.VectorInputFormat;
 import org.apache.hama.matrix.DenseMatrix;
 import org.apache.hama.matrix.DenseVector;
-import org.apache.log4j.Logger;
+import org.apache.hama.util.BytesUtil;
 
-public class DenseMatrixVectorMultMap extends MapReduceBase implements
-    Mapper<IntWritable, MapWritable, IntWritable, MapWritable> {
-  static final Logger LOG = Logger.getLogger(DenseMatrixVectorMultMap.class);
+public class DenseMatrixVectorMultMap extends
+    TableMapper<IntWritable, MapWritable> implements Configurable {
+  private Configuration conf = null;
   protected DenseVector currVector;
   public static final String ITH_ROW = "ith.row";
   public static final String MATRIX_A = "hama.multiplication.matrix.a";
-  public static final String MATRIX_B = "hama.multiplication.matrix.b";
   private IntWritable nKey = new IntWritable();
-  
-  public void configure(JobConf job) {
-    DenseMatrix matrix_a;
-      try {
-        matrix_a = new DenseMatrix(new HamaConfiguration(job), job.get(MATRIX_A, ""));
-        int ithRow = job.getInt(ITH_ROW, 0);
-        nKey.set(ithRow);
-        currVector = matrix_a.getRow(ithRow);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
+
+  public void map(ImmutableBytesWritable key, Result value, Context context)
+      throws IOException, InterruptedException {
+    double ithjth = currVector.get(BytesUtil.getRowIndex(key.get()));
+    if (ithjth != 0) {
+      DenseVector scaled = new DenseVector(value).scale(ithjth);
+      context.write(nKey, scaled.getEntries());
+    }
   }
 
-  public static void initJob(int i, String matrix_a, String matrix_b,
-      Class<DenseMatrixVectorMultMap> map, Class<IntWritable> outputKeyClass,
-      Class<MapWritable> outputValueClass, JobConf jobConf) {
-
-    jobConf.setMapOutputValueClass(outputValueClass);
-    jobConf.setMapOutputKeyClass(outputKeyClass);
-    jobConf.setMapperClass(map);
-    jobConf.setInt(ITH_ROW, i);
-    jobConf.set(MATRIX_A, matrix_a);
-    jobConf.set(MATRIX_B, matrix_b);
-    
-    jobConf.setInputFormat(VectorInputFormat.class);
-    FileInputFormat.addInputPaths(jobConf, matrix_b);
-    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+  @Override
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public void map(IntWritable key, MapWritable value,
-      OutputCollector<IntWritable, MapWritable> output, Reporter reporter)
-      throws IOException {
-
-    DenseVector scaled = new DenseVector(value).scale(currVector.get(key.get()));
-    output.collect(nKey, scaled.getEntries());
-    
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    DenseMatrix matrix_a;
+    try {
+      matrix_a = new DenseMatrix(new HamaConfiguration(), conf.get(MATRIX_A,
+          ""));
+      int ithRow = conf.getInt(ITH_ROW, 0);
+      nKey.set(ithRow);
+      currVector = matrix_a.getRow(ithRow);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
   }
-}
\ No newline at end of file
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java?rev=830058&r1=830057&r2=830058&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java
(original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java
Tue Oct 27 04:55:42 2009
@@ -20,50 +20,27 @@
 package org.apache.hama.matrix.algebra;
 
 import java.io.IOException;
-import java.util.Iterator;
 
-import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.mapred.VectorOutputFormat;
 import org.apache.hama.matrix.DenseVector;
-import org.apache.log4j.Logger;
+import org.apache.hama.util.BytesUtil;
+
+public class DenseMatrixVectorMultReduce extends
+    TableReducer<IntWritable, MapWritable, Writable> {
 
-public class DenseMatrixVectorMultReduce extends MapReduceBase implements
-    Reducer<IntWritable, MapWritable, IntWritable, VectorUpdate> {
-  static final Logger LOG = Logger.getLogger(DenseMatrixVectorMultReduce.class);
-  
-  /**
-   * Use this before submitting a TableReduce job. It will appropriately set up
-   * the JobConf.
-   * 
-   * @param table
-   * @param reducer
-   * @param job
-   */
-  public static void initJob(String table,
-      Class<DenseMatrixVectorMultReduce> reducer, JobConf job) {
-    job.setOutputFormat(VectorOutputFormat.class);
-    job.setReducerClass(reducer);
-    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
-    job.setOutputKeyClass(IntWritable.class);
-    job.setOutputValueClass(BatchUpdate.class);
-  }
-  
   @Override
-  public void reduce(IntWritable key, Iterator<MapWritable> values,
-      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
-      throws IOException {
+  public void reduce(IntWritable key, Iterable<MapWritable> values,
+      Context context) throws IOException, InterruptedException {
     DenseVector sum = new DenseVector();
-    
-    while (values.hasNext()) {
-      DenseVector nVector = new DenseVector(values.next());
+
+    for (MapWritable value : values) {
+      DenseVector nVector = new DenseVector(value);
+      
       if(sum.size() == 0) {
         sum.zeroFill(nVector.size());
         sum.add(nVector);
@@ -75,7 +52,7 @@
     VectorUpdate update = new VectorUpdate(key.get());
     update.putAll(sum.getEntries());
 
-    output.collect(key, update);
+    context.write(new ImmutableBytesWritable(BytesUtil.getRowIndex(key.get())),
+        update.getPut());
   }
-
-}
+}
\ No newline at end of file

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultMap.java?rev=830058&r1=830057&r2=830058&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultMap.java
(original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultMap.java
Tue Oct 27 04:55:42 2009
@@ -21,68 +21,53 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.mapred.VectorInputFormat;
 import org.apache.hama.matrix.SparseMatrix;
 import org.apache.hama.matrix.SparseVector;
-import org.apache.log4j.Logger;
+import org.apache.hama.util.BytesUtil;
 
-public class SparseMatrixVectorMultMap extends MapReduceBase implements
-    Mapper<IntWritable, MapWritable, IntWritable, MapWritable> {
-  static final Logger LOG = Logger.getLogger(SparseMatrixVectorMultMap.class);
+public class SparseMatrixVectorMultMap extends
+    TableMapper<IntWritable, MapWritable> implements Configurable {
+  private Configuration conf = null;
   protected SparseVector currVector;
   public static final String ITH_ROW = "ith.row";
   public static final String MATRIX_A = "hama.multiplication.matrix.a";
-  public static final String MATRIX_B = "hama.multiplication.matrix.b";
   private IntWritable nKey = new IntWritable();
-  
-  public void configure(JobConf job) {
-      SparseMatrix matrix_a;
-      try {
-        matrix_a = new SparseMatrix(new HamaConfiguration(job), job.get(MATRIX_A, ""));
-        int ithRow = job.getInt(ITH_ROW, 0);
-        nKey.set(ithRow);
-        currVector = matrix_a.getRow(ithRow);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-  }
 
-  public static void initJob(int i, String matrix_a, String matrix_b,
-      Class<SparseMatrixVectorMultMap> map, Class<IntWritable> outputKeyClass,
-      Class<MapWritable> outputValueClass, JobConf jobConf) {
-
-    jobConf.setMapOutputValueClass(outputValueClass);
-    jobConf.setMapOutputKeyClass(outputKeyClass);
-    jobConf.setMapperClass(map);
-    jobConf.setInt(ITH_ROW, i);
-    jobConf.set(MATRIX_A, matrix_a);
-    jobConf.set(MATRIX_B, matrix_b);
-    
-    jobConf.setInputFormat(VectorInputFormat.class);
-    FileInputFormat.addInputPaths(jobConf, matrix_b);
-    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+  public void map(ImmutableBytesWritable key, Result value, Context context)
+      throws IOException, InterruptedException {
+    double ithjth = currVector.get(BytesUtil.getRowIndex(key.get()));
+    if (ithjth != 0) {
+      SparseVector scaled = new SparseVector(value).scale(ithjth);
+      context.write(nKey, scaled.getEntries());
+    }
   }
 
   @Override
-  public void map(IntWritable key, MapWritable value,
-      OutputCollector<IntWritable, MapWritable> output, Reporter reporter)
-      throws IOException {
+  public Configuration getConf() {
+    return conf;
+  }
 
-    double ithjth = currVector.get(key.get());
-    if(ithjth != 0) {
-      SparseVector scaled = new SparseVector(value).scale(ithjth);
-      output.collect(nKey, scaled.getEntries());
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    SparseMatrix matrix_a;
+    try {
+      matrix_a = new SparseMatrix(new HamaConfiguration(), conf.get(MATRIX_A,
+          ""));
+      int ithRow = conf.getInt(ITH_ROW, 0);
+      nKey.set(ithRow);
+      currVector = matrix_a.getRow(ithRow);
+    } catch (IOException e) {
+      e.printStackTrace();
     }
-    
   }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultReduce.java?rev=830058&r1=830057&r2=830058&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultReduce.java
(original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultReduce.java
Tue Oct 27 04:55:42 2009
@@ -20,57 +20,32 @@
 package org.apache.hama.matrix.algebra;
 
 import java.io.IOException;
-import java.util.Iterator;
 
-import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.mapred.VectorOutputFormat;
 import org.apache.hama.matrix.SparseVector;
-import org.apache.log4j.Logger;
+import org.apache.hama.util.BytesUtil;
 
-public class SparseMatrixVectorMultReduce extends MapReduceBase implements
-    Reducer<IntWritable, MapWritable, IntWritable, VectorUpdate> {
-  static final Logger LOG = Logger
-      .getLogger(SparseMatrixVectorMultReduce.class);
-
-  /**
-   * Use this before submitting a TableReduce job. It will appropriately set up
-   * the JobConf.
-   * 
-   * @param table
-   * @param reducer
-   * @param job
-   */
-  public static void initJob(String table,
-      Class<SparseMatrixVectorMultReduce> reducer, JobConf job) {
-    job.setOutputFormat(VectorOutputFormat.class);
-    job.setReducerClass(reducer);
-    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
-    job.setOutputKeyClass(IntWritable.class);
-    job.setOutputValueClass(BatchUpdate.class);
-  }
+public class SparseMatrixVectorMultReduce extends
+    TableReducer<IntWritable, MapWritable, Writable> {
 
   @Override
-  public void reduce(IntWritable key, Iterator<MapWritable> values,
-      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
-      throws IOException {
+  public void reduce(IntWritable key, Iterable<MapWritable> values,
+      Context context) throws IOException, InterruptedException {
     SparseVector sum = new SparseVector();
 
-    while (values.hasNext()) {
-      sum.add(new SparseVector(values.next()));
+    for (MapWritable value : values) {
+      sum.add(new SparseVector(value));
     }
-    
+
     VectorUpdate update = new VectorUpdate(key.get());
     update.putAll(sum.getEntries());
 
-    output.collect(key, update);
+    context.write(new ImmutableBytesWritable(BytesUtil.getRowIndex(key.get())),
+        update.getPut());
   }
-
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseMatrix.java?rev=830058&r1=830057&r2=830058&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseMatrix.java Tue Oct 27
04:55:42 2009
@@ -63,7 +63,7 @@
     assertEquals(vector2.get(0), 3.0);
     assertEquals(vector2.get(1), -8.0);
   }
-  
+
   public void sparsity() throws IOException {
     boolean appeared = false;
     for (int i = 0; i < m1.getRows(); i++) {



Mime
View raw message