incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r828314 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/matrix/ src/java/org/apache/hama/matrix/algebra/ src/test/org/apache/hama/matrix/
Date Thu, 22 Oct 2009 05:42:58 GMT
Author: edwardyoon
Date: Thu Oct 22 05:42:57 2009
New Revision: 828314

URL: http://svn.apache.org/viewvc?rev=828314&view=rev
Log:
Replacement of TableReadMapper

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java
    incubator/hama/trunk/src/test/org/apache/hama/matrix/TestDenseMatrix.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=828314&r1=828313&r2=828314&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Oct 22 05:42:57 2009
@@ -41,6 +41,7 @@
 
   IMPROVEMENTS
 
+    HAMA-213: Replacement of TableReadMapper (edwardyoon)
     HAMA-203: Replacement of RandomMatrixMap/Reduce (edwardyoon)
     HAMA-197: Replacement for deprecated API of Hbase (edwardyoon)
     HAMA-188: Upgrade dependencies for hadoop/hbase 2.0 (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=828314&r1=828313&r2=828314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Thu Oct 22 05:42:57 2009
@@ -84,7 +84,10 @@
   public static final int DEFAULT_TRY_TIMES = 10000000;
   
   /** block data column */
+  @Deprecated
   public static final String BLOCK = "block:";
   
+  public static final String BLOCK_FAMILY = "block";
+  
   public static final Text ROWCOUNT= new Text("row");
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java?rev=828314&r1=828313&r2=828314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java Thu Oct 22 05:42:57
2009
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -34,13 +35,10 @@
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
-import org.apache.hadoop.hbase.mapred.TableMap;
-import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -48,9 +46,7 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaAdmin;
 import org.apache.hama.HamaAdminImpl;
@@ -408,35 +404,36 @@
 
   }
 
-  /**
-   * Just full scan a table.
-   */
-  public static class TableReadMapper extends MapReduceBase implements
-      TableMap<ImmutableBytesWritable, BatchUpdate> {
+  public static class ScanMapper extends
+      TableMapper<ImmutableBytesWritable, Put> {
     private static List<Double> alpha = new ArrayList<Double>();
 
-    @SuppressWarnings("unchecked")
-    public void map(ImmutableBytesWritable key, RowResult value,
-        OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
-        @SuppressWarnings("unused")
-        Reporter reporter) throws IOException {
-
-      BatchUpdate update = new BatchUpdate(key.get());
-      for (Map.Entry<byte[], Cell> e : value.entrySet()) {
-        if (alpha.size() == 0) {
-          update.put(e.getKey(), e.getValue().getValue());
-        } else {
-          String column = new String(e.getKey());
-          if (column.startsWith(Constants.COLUMN)) {
-            double currValue = BytesUtil.bytesToDouble(e.getValue().getValue());
-            update.put(e.getKey(), (BytesUtil.doubleToBytes(currValue
-                * alpha.get(0))));
+    public void map(ImmutableBytesWritable key, Result value, Context context)
+        throws IOException, InterruptedException {
+      Put put = new Put(key.get());
+
+      NavigableMap<byte[], NavigableMap<byte[], byte[]>> map = value
+          .getNoVersionMap();
+      for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> a : map.entrySet())
{
+        byte[] family = a.getKey();
+        for (Map.Entry<byte[], byte[]> b : a.getValue().entrySet()) {
+          byte[] qualifier = b.getKey();
+          byte[] val = b.getValue();
+          if (alpha.size() == 0) {
+            put.add(family, qualifier, val);
           } else {
-            update.put(e.getKey(), e.getValue().getValue());
+            if (Bytes.toString(family).equals(Constants.COLUMN_FAMILY)) {
+              double currVal = BytesUtil.bytesToDouble(val);
+              put.add(family, qualifier, BytesUtil.doubleToBytes(currVal
+                  * alpha.get(0)));
+            } else {
+              put.add(family, qualifier, val);
+            }
           }
         }
       }
-      output.collect(key, update);
+
+      context.write(key, put);
     }
 
     public static void setAlpha(double a) {
@@ -448,40 +445,59 @@
 
   /** {@inheritDoc} */
   public Matrix set(Matrix B) throws IOException {
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("set MR job : " + this.getPath());
+    Job job = new Job(config, "set MR job : " + this.getPath());
 
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(config.getNumReduceTasks());
-
-    TableMapReduceUtil.initTableMapJob(B.getPath(), Constants.COLUMN + " "
-        + Constants.ATTRIBUTE + " " + Constants.ALIASEFAMILY + " "
-        + Constants.BLOCK, TableReadMapper.class, ImmutableBytesWritable.class,
-        BatchUpdate.class, jobConf);
-    TableMapReduceUtil.initTableReduceJob(this.getPath(),
-        IdentityTableReduce.class, jobConf);
-
-    JobManager.execute(jobConf);
+    Scan scan = new Scan();
+    scan.addFamily(Bytes.toBytes(Constants.COLUMN_FAMILY));
+    scan.addFamily(Bytes.toBytes(Constants.ATTRIBUTE));
+    scan.addFamily(Bytes.toBytes(Constants.ALIASEFAMILY));
+    scan.addFamily(Bytes.toBytes(Constants.BLOCK_FAMILY));
+    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
+    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY));
+    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY));
+
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(B
+        .getPath(), scan, ScanMapper.class, ImmutableBytesWritable.class,
+        Put.class, job);
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableReducerJob(
+        this.getPath(),
+        org.apache.hadoop.hbase.mapreduce.IdentityTableReducer.class, job);
+    try {
+      job.waitForCompletion(true);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
     return this;
   }
 
   /** {@inheritDoc} */
   public Matrix set(double alpha, Matrix B) throws IOException {
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("set MR job : " + this.getPath());
-
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(config.getNumReduceTasks());
-
-    TableReadMapper.setAlpha(alpha);
-    TableMapReduceUtil.initTableMapJob(B.getPath(), Constants.COLUMN + " "
-        + Constants.ATTRIBUTE + " " + Constants.ALIASEFAMILY + " "
-        + Constants.BLOCK, TableReadMapper.class, ImmutableBytesWritable.class,
-        BatchUpdate.class, jobConf);
-    TableMapReduceUtil.initTableReduceJob(this.getPath(),
-        IdentityTableReduce.class, jobConf);
+    Job job = new Job(config, "set MR job : " + this.getPath());
 
-    JobManager.execute(jobConf);
+    Scan scan = new Scan();
+    scan.addFamily(Bytes.toBytes(Constants.COLUMN_FAMILY));
+    scan.addFamily(Bytes.toBytes(Constants.ATTRIBUTE));
+    scan.addFamily(Bytes.toBytes(Constants.ALIASEFAMILY));
+    scan.addFamily(Bytes.toBytes(Constants.BLOCK_FAMILY));
+    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
+    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY));
+    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY));
+    ScanMapper.setAlpha(alpha);
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(B
+        .getPath(), scan, ScanMapper.class, ImmutableBytesWritable.class,
+        Put.class, job);
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableReducerJob(
+        this.getPath(),
+        org.apache.hadoop.hbase.mapreduce.IdentityTableReducer.class, job);
+    try {
+      job.waitForCompletion(true);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
     return this;
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java?rev=828314&r1=828313&r2=828314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java Thu
Oct 22 05:42:57 2009
@@ -48,6 +48,7 @@
 
   /** a matrix copy of the original copy collected in "eicol" family * */
   public static final String EICOL = "eicol:";
+  public static final String EICOL_FAMILY = "eicol";
   /** a column family collect all values and statuses used during computation * */
   @Deprecated
   public static final String EI = "eival:";
@@ -65,6 +66,7 @@
   public static final String EIIND = EI + "ind";
   /** a matrix collect all the eigen vectors * */
   public static final String EIVEC = "eivec:";
+  public static final String EIVEC_FAMILY = "eivec";
   public static final String MATRIX = "hama.jacobieigenvalue.matrix";
   /** parameters for pivot * */
   public static final String PIVOTROW = "hama.jacobi.pivot.row";

Modified: incubator/hama/trunk/src/test/org/apache/hama/matrix/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/matrix/TestDenseMatrix.java?rev=828314&r1=828313&r2=828314&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/matrix/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/matrix/TestDenseMatrix.java Thu Oct 22 05:42:57
2009
@@ -85,6 +85,7 @@
 
     getRowColumnVector();
     setRowColumnVector();
+
     setMatrix(m1);
     setAlphaMatrix(m1);
   }
@@ -149,7 +150,7 @@
     Matrix result = m1.add(0.1, m2);
     assertEquals(value, result.get(0, 0));
   }
-  
+
   public void setMatrix(Matrix m1) throws IOException {
     Matrix a = new DenseMatrix(conf, m1.getRows(), m1.getColumns());
     a.set(m1);



Mime
View raw message