incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r830446 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/io/ src/java/org/apache/hama/matrix/ src/java/org/apache/hama/matrix/algebra/ src/test/org/apache/hama/matrix/
Date Wed, 28 Oct 2009 05:47:48 GMT
Author: edwardyoon
Date: Wed Oct 28 05:47:48 2009
New Revision: 830446

URL: http://svn.apache.org/viewvc?rev=830446&view=rev
Log:
Replacement of NormMap/Reduce

Added:
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapRed.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java
    incubator/hama/trunk/src/test/org/apache/hama/matrix/TestAbstractMatrix.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=830446&r1=830445&r2=830446&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Oct 28 05:47:48 2009
@@ -42,6 +42,7 @@
 
   IMPROVEMENTS
 
+    HAMA-205: Replacement of NormMap/Reduce (edwardyoon)
     HAMA-207: Replacement of Mat-Mat addition Map/Reduce (edwardyoon)
     HAMA-208: Replacement of vector-matrix multiplication Map/Reduce (edwardyoon)
     HAMA-216: Removing JobManager in util package (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java?rev=830446&r1=830445&r2=830446&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java Wed Oct 28 05:47:48
2009
@@ -43,10 +43,12 @@
 
   public VectorUpdate(String row) {
     this.batchUpdate = new BatchUpdate(row);
+    this.put = new Put(Bytes.toBytes(row));
   }
 
   public VectorUpdate(byte[] row) {
     this.batchUpdate = new BatchUpdate(row);
+    this.put = new Put(row);
   }
 
   public void put(int j, double value) {
@@ -65,6 +67,7 @@
    */
   public void put(String cfName, int j, double value) {
     this.batchUpdate.put(Bytes.toBytes(cfName + j), Bytes.toBytes(value));
+    this.put.add(Bytes.toBytes(cfName), Bytes.toBytes(String.valueOf(j)), Bytes.toBytes(value));
   }
 
   public void put(String name, double value) {
@@ -87,6 +90,11 @@
         .toBytes(val));
   }
 
+  public void put(String column, String qualifier, double val) {
+    this.put.add(Bytes.toBytes(column), Bytes.toBytes(qualifier), Bytes
+        .toBytes(val));
+  }
+  
   public void put(String row, int val) {
     this.batchUpdate.put(row, BytesUtil.intToBytes(val));
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java?rev=830446&r1=830445&r2=830446&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java Wed Oct 28 05:47:48
2009
@@ -40,35 +40,24 @@
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaAdmin;
 import org.apache.hama.HamaAdminImpl;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.io.VectorUpdate;
 import org.apache.hama.matrix.algebra.JacobiEigenValue;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed;
+import org.apache.hama.matrix.algebra.MatrixNormMapReduce;
 import org.apache.hama.matrix.algebra.TransposeMap;
 import org.apache.hama.matrix.algebra.TransposeReduce;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixFrobeniusNormCombiner;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixFrobeniusNormMapper;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixFrobeniusNormReducer;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixInfinityNormMapper;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixInfinityNormReducer;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixMaxValueNormMapper;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixMaxValueNormReducer;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixOneNormCombiner;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixOneNormMapper;
-import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixOneNormReducer;
 import org.apache.hama.util.BytesUtil;
 import org.apache.hama.util.RandomVariable;
 import org.apache.log4j.Logger;
@@ -148,7 +137,7 @@
 
       // It's a temporary data.
       this.tableDesc.addFamily(new HColumnDescriptor(Bytes
-          .toBytes(Constants.BLOCK)));
+          .toBytes(Constants.BLOCK_FAMILY)));
       // the following families are used in JacobiEigenValue computation
       this.tableDesc.addFamily(new HColumnDescriptor(Bytes
           .toBytes(JacobiEigenValue.EI_COLUMNFAMILY)));
@@ -167,9 +156,8 @@
 
       // Record the matrix type in METADATA_TYPE
       Put put = new Put(Bytes.toBytes(Constants.METADATA));
-      put.add(Constants.ATTRIBUTE, Bytes
-          .toBytes(Constants.METADATA_TYPE), Bytes.toBytes(this.getClass()
-          .getSimpleName()));
+      put.add(Constants.ATTRIBUTE, Bytes.toBytes(Constants.METADATA_TYPE),
+          Bytes.toBytes(this.getClass().getSimpleName()));
       table.put(put);
 
       // the new matrix's reference is 1.
@@ -182,149 +170,166 @@
   }
 
   protected double getNorm1() throws IOException {
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("norm1 MR job : " + this.getPath());
-
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(1);
-
-    final FileSystem fs = FileSystem.get(jobConf);
+    final FileSystem fs = FileSystem.get(config);
     Path outDir = new Path(new Path(getType() + "_TMP_norm1_dir_"
         + System.currentTimeMillis()), "out");
     if (fs.exists(outDir))
       fs.delete(outDir, true);
 
-    MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
-        MatrixOneNormMapper.class, MatrixOneNormCombiner.class,
-        MatrixOneNormReducer.class, jobConf);
-
-    // update the out put dir of the job
-    outDir = FileOutputFormat.getOutputPath(jobConf);
-    JobClient.runJob(jobConf);
+    Job job = new Job(config, "norm1 MR job : " + this.getPath());
+    Scan scan = new Scan();
+    scan.addFamily(Constants.COLUMNFAMILY);
+
+    TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
+        MatrixNormMapReduce.MatrixOneNormMapper.class, IntWritable.class,
+        DoubleWritable.class, job);
+
+    job.setCombinerClass(MatrixNormMapReduce.MatrixOneNormCombiner.class);
+    job.setReducerClass(MatrixNormMapReduce.MatrixOneNormReducer.class);
+    job.setNumReduceTasks(1);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(DoubleWritable.class);
+    SequenceFileOutputFormat.setOutputPath(job, outDir);
 
-    // read outputs
-    Path inFile = new Path(outDir, "reduce-out");
-    IntWritable numInside = new IntWritable();
-    DoubleWritable max = new DoubleWritable();
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
     try {
-      reader.next(numInside, max);
-    } finally {
-      reader.close();
+      job.waitForCompletion(true);
+      System.out.println(job.reduceProgress());
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
     }
 
+    // read outputs
+    double result = readOutput(config, fs, outDir);
     fs.delete(outDir.getParent(), true);
-    return max.get();
+    return result;
   }
 
   protected double getMaxvalue() throws IOException {
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("MaxValue Norm MR job : " + this.getPath());
-
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(1);
-
-    final FileSystem fs = FileSystem.get(jobConf);
+    final FileSystem fs = FileSystem.get(config);
     Path outDir = new Path(new Path(getType() + "_TMP_normMaxValue_dir_"
         + System.currentTimeMillis()), "out");
     if (fs.exists(outDir))
       fs.delete(outDir, true);
 
-    MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
-        MatrixMaxValueNormMapper.class, MatrixMaxValueNormReducer.class,
-        MatrixMaxValueNormReducer.class, jobConf);
-
-    // update the out put dir of the job
-    outDir = FileOutputFormat.getOutputPath(jobConf);
-    JobClient.runJob(jobConf);
+    Job job = new Job(config, "MaxValue Norm MR job : " + this.getPath());
+    Scan scan = new Scan();
+    scan.addFamily(Constants.COLUMNFAMILY);
+
+    TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
+        MatrixNormMapReduce.MatrixMaxValueNormMapper.class, IntWritable.class,
+        DoubleWritable.class, job);
+
+    job.setCombinerClass(MatrixNormMapReduce.MatrixMaxValueNormReducer.class);
+    job.setReducerClass(MatrixNormMapReduce.MatrixMaxValueNormReducer.class);
+    job.setNumReduceTasks(1);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(DoubleWritable.class);
+    SequenceFileOutputFormat.setOutputPath(job, outDir);
 
-    // read outputs
-    Path inFile = new Path(outDir, "part-00000");
-    IntWritable numInside = new IntWritable();
-    DoubleWritable max = new DoubleWritable();
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
     try {
-      reader.next(numInside, max);
-    } finally {
-      reader.close();
+      job.waitForCompletion(true);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
     }
 
+    // read outputs
+    double result = readOutput(config, fs, outDir);
     fs.delete(outDir.getParent(), true);
-    return max.get();
+    return result;
   }
 
   protected double getInfinity() throws IOException {
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("Infinity Norm MR job : " + this.getPath());
-
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(1);
-
-    final FileSystem fs = FileSystem.get(jobConf);
+    final FileSystem fs = FileSystem.get(config);
     Path outDir = new Path(new Path(getType() + "_TMP_normInifity_dir_"
         + System.currentTimeMillis()), "out");
     if (fs.exists(outDir))
       fs.delete(outDir, true);
 
-    MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
-        MatrixInfinityNormMapper.class, MatrixInfinityNormReducer.class,
-        MatrixInfinityNormReducer.class, jobConf);
-
-    // update the out put dir of the job
-    outDir = FileOutputFormat.getOutputPath(jobConf);
+    Job job = new Job(config, "Infinity Norm MR job : " + this.getPath());
+    Scan scan = new Scan();
+    scan.addFamily(Constants.COLUMNFAMILY);
 
-    JobClient.runJob(jobConf);
+    TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
+        MatrixNormMapReduce.MatrixInfinityNormMapper.class, IntWritable.class,
+        DoubleWritable.class, job);
+
+    job.setCombinerClass(MatrixNormMapReduce.MatrixInfinityNormReduce.class);
+    job.setReducerClass(MatrixNormMapReduce.MatrixInfinityNormReduce.class);
+    job.setNumReduceTasks(1);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(DoubleWritable.class);
+    SequenceFileOutputFormat.setOutputPath(job, outDir);
 
-    // read outputs
-    Path inFile = new Path(outDir, "part-00000");
-    IntWritable numInside = new IntWritable();
-    DoubleWritable max = new DoubleWritable();
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
     try {
-      reader.next(numInside, max);
-    } finally {
-      reader.close();
+      job.waitForCompletion(true);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
     }
 
+    // read outputs
+    double result = readOutput(config, fs, outDir);
     fs.delete(outDir.getParent(), true);
-    return max.get();
+    return result;
   }
 
   protected double getFrobenius() throws IOException {
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("Frobenius Norm MR job : " + this.getPath());
-
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(1);
-
-    final FileSystem fs = FileSystem.get(jobConf);
+    final FileSystem fs = FileSystem.get(config);
     Path outDir = new Path(new Path(getType() + "_TMP_normFrobenius_dir_"
         + System.currentTimeMillis()), "out");
     if (fs.exists(outDir))
       fs.delete(outDir, true);
 
-    MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
-        MatrixFrobeniusNormMapper.class, MatrixFrobeniusNormCombiner.class,
-        MatrixFrobeniusNormReducer.class, jobConf);
+    Job job = new Job(config, "Frobenius Norm MR job : " + this.getPath());
+    Scan scan = new Scan();
+    scan.addFamily(Constants.COLUMNFAMILY);
 
-    // update the out put dir of the job
-    outDir = FileOutputFormat.getOutputPath(jobConf);
+    TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
+        MatrixNormMapReduce.MatrixFrobeniusNormMapper.class, IntWritable.class,
+        DoubleWritable.class, job);
+
+    job.setCombinerClass(MatrixNormMapReduce.MatrixFrobeniusNormCombiner.class);
+    job.setReducerClass(MatrixNormMapReduce.MatrixFrobeniusNormReducer.class);
+    job.setNumReduceTasks(1);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(DoubleWritable.class);
+    SequenceFileOutputFormat.setOutputPath(job, outDir);
 
-    JobClient.runJob(jobConf);
+    try {
+      job.waitForCompletion(true);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
 
     // read outputs
-    Path inFile = new Path(outDir, "part-00000");
+    double result = readOutput(config, fs, outDir);
+    fs.delete(outDir.getParent(), true);
+    return result;
+  }
+
+  private double readOutput(HamaConfiguration config, FileSystem fs, Path outDir)
+      throws IOException {
+    Path inFile = new Path(outDir, "part-r-00000");
     IntWritable numInside = new IntWritable();
-    DoubleWritable sqrt = new DoubleWritable();
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+    DoubleWritable result = new DoubleWritable();
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, config);
     try {
-      reader.next(numInside, sqrt);
+      reader.next(numInside, result);
     } finally {
       reader.close();
     }
-
-    fs.delete(outDir.getParent(), true);
-    return sqrt.get();
+    return result.get();
   }
 
   /** {@inheritDoc} */
@@ -360,8 +365,8 @@
   /** {@inheritDoc} */
   public void setColumnLabel(int column, String name) throws IOException {
     Put put = new Put(Bytes.toBytes(Constants.CINDEX));
-    put.add(Constants.ATTRIBUTE, Bytes.toBytes(String
-        .valueOf(column)), Bytes.toBytes(name));
+    put.add(Constants.ATTRIBUTE, Bytes.toBytes(String.valueOf(column)), Bytes
+        .toBytes(name));
     table.put(put);
   }
 
@@ -385,8 +390,10 @@
   /** {@inheritDoc} */
   public void setDimension(int rows, int columns) throws IOException {
     Put put = new Put(Bytes.toBytes(Constants.METADATA));
-    put.add(Constants.ATTRIBUTE, Bytes.toBytes("rows"), BytesUtil.intToBytes(rows));
-    put.add(Constants.ATTRIBUTE, Bytes.toBytes("columns"), BytesUtil.intToBytes(columns));
+    put.add(Constants.ATTRIBUTE, Bytes.toBytes("rows"), BytesUtil
+        .intToBytes(rows));
+    put.add(Constants.ATTRIBUTE, Bytes.toBytes("columns"), BytesUtil
+        .intToBytes(columns));
     table.put(put);
   }
 
@@ -402,7 +409,7 @@
       TableMapper<ImmutableBytesWritable, Put> implements Configurable {
     private static Double alpha = null;
     private Configuration conf = null;
-    
+
     public void map(ImmutableBytesWritable key, Result value, Context context)
         throws IOException, InterruptedException {
       Put put = new Put(key.get());
@@ -417,9 +424,11 @@
           if (alpha.equals(new Double(1))) {
             put.add(family, qualifier, val);
           } else {
-            if (Bytes.toString(family).equals(Bytes.toString(Constants.COLUMNFAMILY))) {
+            if (Bytes.toString(family).equals(
+                Bytes.toString(Constants.COLUMNFAMILY))) {
               double currVal = BytesUtil.bytesToDouble(val);
-              put.add(family, qualifier, BytesUtil.doubleToBytes(currVal * alpha));
+              put.add(family, qualifier, BytesUtil.doubleToBytes(currVal
+                  * alpha));
             } else {
               put.add(family, qualifier, val);
             }
@@ -486,7 +495,7 @@
     scan.addFamily(Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY));
     Float f = new Float(alpha);
     job.getConfiguration().setFloat("set.alpha", f);
-    
+
     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(B
         .getPath(), scan, ScanMapper.class, ImmutableBytesWritable.class,
         Put.class, job);
@@ -510,8 +519,8 @@
 
   protected void setReference(int reference) throws IOException {
     Put put = new Put(Bytes.toBytes(Constants.METADATA));
-    put.add(Constants.ATTRIBUTE, Bytes
-        .toBytes(Constants.METADATA_REFERENCE), Bytes.toBytes(reference));
+    put.add(Constants.ATTRIBUTE, Bytes.toBytes(Constants.METADATA_REFERENCE),
+        Bytes.toBytes(reference));
     table.put(put);
   }
 
@@ -619,15 +628,15 @@
 
     return allTrue;
   }
-  
+
   public boolean save(String aliasename) throws IOException {
     // mark & update the aliase name in "alise:name" meta column.
     // ! one matrix has only one aliasename now.
     Put put = new Put(Bytes.toBytes(Constants.METADATA));
     put.add(Bytes.toBytes(Constants.ALIASEFAMILY), Bytes.toBytes("name"), Bytes
         .toBytes(aliasename));
-    put.add(Constants.ATTRIBUTE, Bytes.toBytes("type"), Bytes
-        .toBytes(this.getType()));
+    put.add(Constants.ATTRIBUTE, Bytes.toBytes("type"), Bytes.toBytes(this
+        .getType()));
     table.put(put);
 
     return hamaAdmin.save(this, aliasename);

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java?rev=830446&r1=830445&r2=830446&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java Wed Oct 28 05:47:48
2009
@@ -875,8 +875,8 @@
        * Upate the pivot and the eigen values indexed by the pivot
        ************************************************************************/
       vu = new VectorUpdate(pivot_row);
-      vu.put(JacobiEigenValue.EICOL, pivot_col, 0);
-      table.commit(vu.getBatchUpdate());
+      vu.put(JacobiEigenValue.EICOL_FAMILY, pivot_col, 0);
+      table.put(vu.getPut());
 
       state = update(pivot_row, -t, state);
       state = update(pivot_col, t, state);
@@ -910,20 +910,23 @@
       // rotate eigenvectors
       LOG.info("rotating eigenvector");
       for (int i = 0; i < size; i++) {
-        e1 = BytesUtil.bytesToDouble(table.get(
-            BytesUtil.getRowIndex(pivot_row),
-            Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue());
-        e2 = BytesUtil.bytesToDouble(table.get(
-            BytesUtil.getRowIndex(pivot_col),
-            Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue());
+        get = new Get(BytesUtil.getRowIndex(pivot_row));
+        e1 = BytesUtil.bytesToDouble(table.get(get).getValue(
+            Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
+            Bytes.toBytes(String.valueOf(i))));
+       
+        get = new Get(BytesUtil.getRowIndex(pivot_col));
+        e2 = BytesUtil.bytesToDouble(table.get(get).getValue(
+            Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
+            Bytes.toBytes(String.valueOf(i))));
 
         vu = new VectorUpdate(pivot_row);
-        vu.put(JacobiEigenValue.EIVEC, i, c * e1 - s * e2);
-        table.commit(vu.getBatchUpdate());
+        vu.put(JacobiEigenValue.EIVEC_FAMILY, i, c * e1 - s * e2);
+        table.put(vu.getPut());
 
         vu = new VectorUpdate(pivot_col);
-        vu.put(JacobiEigenValue.EIVEC, i, s * e1 + c * e2);
-        table.commit(vu.getBatchUpdate());
+        vu.put(JacobiEigenValue.EIVEC_FAMILY, i, s * e1 + c * e2);
+        table.put(vu.getPut());
       }
 
       LOG.info("update index...");
@@ -937,14 +940,19 @@
 
   void maxind(int row, int size) throws IOException {
     int m = row + 1;
+    Get get = null;
     if (row + 2 < size) {
-      double max = BytesUtil.bytesToDouble(table
-          .get(BytesUtil.getRowIndex(row),
-              Bytes.toBytes(JacobiEigenValue.EICOL + m)).getValue());
+      get = new Get(BytesUtil.getRowIndex(row));
+      
+      double max = BytesUtil.bytesToDouble(table.get(get).getValue(
+          Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY), 
+          Bytes.toBytes(String.valueOf(m))));
       double val;
       for (int i = row + 2; i < size; i++) {
-        val = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row),
-            Bytes.toBytes(JacobiEigenValue.EICOL + i)).getValue());
+        get = new Get(BytesUtil.getRowIndex(row));
+        val = BytesUtil.bytesToDouble(table.get(get).getValue(
+            Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY), 
+            Bytes.toBytes(String.valueOf(i))));
         if (Math.abs(val) > Math.abs(max)) {
           m = i;
           max = val;
@@ -953,30 +961,36 @@
     }
 
     VectorUpdate vu = new VectorUpdate(row);
-    vu.put(JacobiEigenValue.EIIND, m);
-    table.commit(vu.getBatchUpdate());
+    vu.put(JacobiEigenValue.EI_COLUMNFAMILY, "ind", String.valueOf(m));
+    table.put(vu.getPut());
   }
 
   int update(int row, double value, int state) throws IOException {
-    double e = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row),
-        Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
-    int changed = BytesUtil.bytesToInt(table.get(BytesUtil.getRowIndex(row),
-        Bytes.toBytes(JacobiEigenValue.EICHANGED)).getValue());
+    Get get = new Get(BytesUtil.getRowIndex(row));
+    double e = BytesUtil.bytesToDouble(table.get(get).getValue(
+        Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY), 
+        Bytes.toBytes(JacobiEigenValue.EI_VAL)));
+    int changed = BytesUtil.bytesToInt(table.get(get).getValue(
+        Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY), 
+        Bytes.toBytes("changed")));
     double y = e;
     e += value;
 
     VectorUpdate vu = new VectorUpdate(row);
-    vu.put(JacobiEigenValue.EIVAL, e);
+    vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EI_VAL, e);
+    
     if (changed == 1 && (Math.abs(y - e) < .0000001)) { // y == e) {
       changed = 0;
-      vu.put(JacobiEigenValue.EICHANGED, changed);
+      vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
+      
       state--;
     } else if (changed == 0 && (Math.abs(y - e) > .0000001)) {
       changed = 1;
-      vu.put(JacobiEigenValue.EICHANGED, changed);
+      vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
+      
       state++;
     }
-    table.commit(vu.getBatchUpdate());
+    table.put(vu.getPut());
     return state;
   }
 
@@ -984,16 +998,21 @@
   boolean verifyEigenValue(double[] e, double[][] E) throws IOException {
     boolean success = true;
     double e1, ev;
+    Get get = null;
     for (int i = 0; i < e.length; i++) {
-      e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(i),
-          Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+      get = new Get(BytesUtil.getRowIndex(i));
+      e1 = BytesUtil.bytesToDouble(table.get(get).getValue(
+          Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
+          Bytes.toBytes(JacobiEigenValue.EI_VAL)));
       success &= ((Math.abs(e1 - e[i]) < .0000001));
       if (!success)
         return success;
 
       for (int j = 0; j < E[i].length; j++) {
-        ev = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(i),
-            Bytes.toBytes(JacobiEigenValue.EIVEC + j)).getValue());
+        get = new Get(BytesUtil.getRowIndex(i));
+        ev = BytesUtil.bytesToDouble(table.get(get).getValue(
+            Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY), 
+            Bytes.toBytes(String.valueOf(j))));
         success &= ((Math.abs(ev - E[i][j]) < .0000001));
         if (!success)
           return success;

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java?rev=830446&r1=830445&r2=830446&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java Wed
Oct 28 05:47:48 2009
@@ -62,6 +62,8 @@
   
   /** a column identify whether the eigen values have been changed * */
   public static final String EICHANGED = EI + "changed";
+  
+  public static final String EICHANGED_STRING = "changed";
   /** a column identify the index of the max absolute value each row * */
   public static final String EIIND = EI + "ind";
   /** a matrix collect all the eigen vectors * */

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java?rev=830446&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java
(added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java
Wed Oct 28 05:47:48 2009
@@ -0,0 +1,202 @@
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hama.Constants;
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+public class MatrixNormMapReduce {
+  public final static IntWritable nKey = new IntWritable(-1);
+
+  /** Infinity Norm */
+  public static class MatrixInfinityNormMapper extends
+      TableMapper<IntWritable, DoubleWritable> {
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result value, Context context)
+        throws IOException, InterruptedException {
+
+      double rowSum = 0;
+      NavigableMap<byte[], byte[]> v = value
+          .getFamilyMap(Constants.COLUMNFAMILY);
+      for (Map.Entry<byte[], byte[]> e : v.entrySet()) {
+        rowSum += Math.abs(BytesUtil.bytesToDouble(e.getValue()));
+      }
+
+      context.write(MatrixNormMapReduce.nKey, new DoubleWritable(rowSum));
+    }
+  }
+
+  /**
+   * Matrix Infinity Norm Reducer
+   */
+  public static class MatrixInfinityNormReduce extends
+      Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+    static final Logger LOG = Logger.getLogger(MatrixInfinityNormReduce.class);
+    private double max = 0;
+
+    public void reduce(IntWritable key, Iterable<DoubleWritable> values,
+        Context context) throws IOException, InterruptedException {
+      for (DoubleWritable val : values) {
+        max = Math.max(val.get(), max);
+      }
+
+      context.write(MatrixNormMapReduce.nKey, new DoubleWritable(max));
+    }
+  }
+
+  /** One Norm Mapper */
+  public static class MatrixOneNormMapper extends
+      TableMapper<IntWritable, DoubleWritable> {
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result value, Context context)
+        throws IOException, InterruptedException {
+
+      NavigableMap<byte[], byte[]> v = value
+          .getFamilyMap(Constants.COLUMNFAMILY);
+      for (Map.Entry<byte[], byte[]> e : v.entrySet()) {
+        context.write(new IntWritable(BytesUtil.bytesToInt(e.getKey())),
+            new DoubleWritable(BytesUtil.bytesToDouble(e.getValue())));
+      }
+    }
+  }
+
+  /** One Norm Combiner * */
+  public static class MatrixOneNormCombiner extends
+      Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+
+    @Override
+    public void reduce(IntWritable key, Iterable<DoubleWritable> values,
+        Context context) throws IOException, InterruptedException {
+
+      double partialColSum = 0;
+      for (DoubleWritable val : values) {
+        partialColSum += val.get();
+      }
+
+      context.write(key, new DoubleWritable(partialColSum));
+    }
+  }
+
+  /** One Norm Reducer * */
+  public static class MatrixOneNormReducer extends
+      Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+    private double max = 0;
+
+    @Override
+    public void reduce(IntWritable key, Iterable<DoubleWritable> values,
+        Context context) throws IOException, InterruptedException {
+      double colSum = 0;
+
+      for (DoubleWritable val : values) {
+        colSum += val.get();
+      }
+
+      max = Math.max(Math.abs(colSum), max);
+    }
+
+    public void cleanup(Context context) throws IOException,
+        InterruptedException {
+      context.write(MatrixNormMapReduce.nKey, new DoubleWritable(max));
+    }
+  }
+
+  /** Frobenius Norm Mapper */
+  public static class MatrixFrobeniusNormMapper extends
+      TableMapper<IntWritable, DoubleWritable> {
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result value, Context context)
+        throws IOException, InterruptedException {
+      double rowSqrtSum = 0;
+
+      NavigableMap<byte[], byte[]> v = value
+          .getFamilyMap(Constants.COLUMNFAMILY);
+      for (Map.Entry<byte[], byte[]> e : v.entrySet()) {
+        double cellValue = BytesUtil.bytesToDouble(e.getValue());
+        rowSqrtSum += (cellValue * cellValue);
+      }
+
+      context.write(MatrixNormMapReduce.nKey, new DoubleWritable(rowSqrtSum));
+    }
+  }
+
+  /** Frobenius Norm Combiner */
+  public static class MatrixFrobeniusNormCombiner extends
+      Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+    private double sqrtSum = 0;
+
+    @Override
+    public void reduce(IntWritable key, Iterable<DoubleWritable> values,
+        Context context) throws IOException, InterruptedException {
+      for (DoubleWritable val : values) {
+        sqrtSum += val.get();
+      }
+
+      context.write(MatrixNormMapReduce.nKey, new DoubleWritable(sqrtSum));
+    }
+  }
+
+  /** Frobenius Norm Reducer */
+  public static class MatrixFrobeniusNormReducer extends
+      Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+    private double sqrtSum = 0;
+
+    @Override
+    public void reduce(IntWritable key, Iterable<DoubleWritable> values,
+        Context context) throws IOException, InterruptedException {
+      for (DoubleWritable val : values) {
+        sqrtSum += val.get();
+      }
+
+      context.write(MatrixNormMapReduce.nKey, new DoubleWritable(Math
+          .sqrt(sqrtSum)));
+    }
+  }
+
+  /** MaxValue Norm Mapper * */
+  public static class MatrixMaxValueNormMapper extends
+      TableMapper<IntWritable, DoubleWritable> {
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result value, Context context)
+        throws IOException, InterruptedException {
+      double max = 0;
+
+      NavigableMap<byte[], byte[]> v = value
+          .getFamilyMap(Constants.COLUMNFAMILY);
+      for (Map.Entry<byte[], byte[]> e : v.entrySet()) {
+        double cellValue = BytesUtil.bytesToDouble(e.getValue());
+        max = cellValue > max ? cellValue : max;
+      }
+
+      context.write(MatrixNormMapReduce.nKey, new DoubleWritable(max));
+    }
+  }
+
+  /** MaxValue Norm Reducer */
+  public static class MatrixMaxValueNormReducer extends
+      Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+    private double max = 0;
+
+    @Override
+    public void reduce(IntWritable key, Iterable<DoubleWritable> values,
+        Context context) throws IOException, InterruptedException {
+      for (DoubleWritable val : values) {
+        max = Math.max(val.get(), max);
+      }
+
+      context.write(MatrixNormMapReduce.nKey, new DoubleWritable(max));
+    }
+  }
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/matrix/TestAbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/matrix/TestAbstractMatrix.java?rev=830446&r1=830445&r2=830446&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/matrix/TestAbstractMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/matrix/TestAbstractMatrix.java Wed Oct 28
05:47:48 2009
@@ -48,6 +48,7 @@
   }
 
   public void normTest(Matrix matrix) throws IOException {
+
     double norm1 = matrix.norm(Norm.One);
     double verify_norm1 = MatrixTestCommon.verifyNorm1(matrix);
     gap = norm1 - verify_norm1;
@@ -57,15 +58,15 @@
     double verify_normInf = MatrixTestCommon.verifyNormInfinity(matrix);
     gap = normInfinity - verify_normInf;
     assertTrue(gap < 0.000001 && gap > -0.000001);
-
-    double normMaxValue = matrix.norm(Norm.Maxvalue);
-    double verify_normMV = MatrixTestCommon.verifyNormMaxValue(matrix);
-    gap = normMaxValue - verify_normMV;
-    assertTrue(gap < 0.000001 && gap > -0.000001);
-
+    
     double normFrobenius = matrix.norm(Norm.Frobenius);
     double verify_normFrobenius = MatrixTestCommon.verifyNormFrobenius(matrix);
     gap = normFrobenius - verify_normFrobenius;
     assertTrue(gap < 0.000001 && gap > -0.000001);
+    
+    double normMaxValue = matrix.norm(Norm.Maxvalue);
+    double verify_normMV = MatrixTestCommon.verifyNormMaxValue(matrix);
+    gap = normMaxValue - verify_normMV;
+    assertTrue(gap < 0.000001 && gap > -0.000001);
   }
 }



Mime
View raw message