mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlyubi...@apache.org
Subject svn commit: r1292532 [1/2] - in /mahout/trunk/core/src: main/java/org/apache/mahout/math/hadoop/ main/java/org/apache/mahout/math/hadoop/stochasticsvd/ test/java/org/apache/mahout/math/hadoop/ test/java/org/apache/mahout/math/hadoop/stochasticsvd/
Date Wed, 22 Feb 2012 21:57:28 GMT
Author: dlyubimov
Date: Wed Feb 22 21:57:27 2012
New Revision: 1292532

URL: http://svn.apache.org/viewvc?rev=1292532&view=rev
Log:
MAHOUT-817 PCA options for SSVD (RC1)

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDPCADenseTest.java
      - copied, changed from r1245615, mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCommonTest.java
      - copied, changed from r1245615, mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototypeTest.java
Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototypeTest.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/YtYJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDTestsHelper.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java?rev=1292532&r1=1292531&r2=1292532&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java Wed Feb 22 21:57:27 2012
@@ -17,8 +17,12 @@
 
 package org.apache.mahout.math.hadoop;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,10 +43,8 @@ import org.apache.mahout.math.VectorWrit
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
 
 /**
  * DistributedRowMatrix is a FileSystem-backed VectorIterable in which the vectors live in a
@@ -64,7 +66,7 @@ import java.util.Iterator;
  */
 public class DistributedRowMatrix implements VectorIterable, Configurable {
   public static final String KEEP_TEMP_FILES = "DistributedMatrix.keep.temp.files";
-  
+
   private static final Logger log = LoggerFactory.getLogger(DistributedRowMatrix.class);
 
   private final Path inputPath;
@@ -176,13 +178,13 @@ public class DistributedRowMatrix implem
       throw new CardinalityException(numRows, other.numRows());
     }
     Path outPath = new Path(outputTmpBasePath.getParent(), "productWith-" + (System.nanoTime() & 0xFF));
-    
+
     Configuration initialConf = getConf() == null ? new Configuration() : getConf();
     Configuration conf =
-        MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf, 
-                                                            rowPath, 
-                                                            other.rowPath, 
-                                                            outPath, 
+        MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf,
+                                                            rowPath,
+                                                            other.rowPath,
+                                                            outPath,
                                                             other.numCols);
     JobClient.runJob(new JobConf(conf));
     DistributedRowMatrix out = new DistributedRowMatrix(outPath, outputTmpPath, numCols, other.numCols());
@@ -190,6 +192,39 @@ public class DistributedRowMatrix implem
     return out;
   }
 
+  public Vector columnMeans() throws IOException, InterruptedException, ClassNotFoundException,
+  IllegalArgumentException, SecurityException, InstantiationException, IllegalAccessException,
+  InvocationTargetException, NoSuchMethodException {
+    return columnMeans("SequentialAccessSparseVector");
+  }
+
+  /**
+   * Returns the column-wise mean of a DistributedRowMatrix
+   *
+   * @param vectorClass
+   *          desired class for the column-wise mean vector e.g.
+   *          RandomAccessSparseVector, DenseVector
+   * @return Vector containing the column-wise mean of this
+   */
+  public Vector columnMeans(String vectorClass) throws IOException,
+      InterruptedException, IllegalArgumentException, SecurityException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException,
+      InvocationTargetException, NoSuchMethodException {
+    Path outputVectorTmpPath =
+        new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime())));
+    Configuration initialConf =
+        getConf() == null ? new Configuration() : getConf();
+    String vectorClassFull = "org.apache.mahout.math." + vectorClass;
+    Vector mean =
+        MatrixColumnMeansJob.run(initialConf, rowPath, outputVectorTmpPath,
+            vectorClassFull);
+    if (!keepTempFiles) {
+      FileSystem fs = outputVectorTmpPath.getFileSystem(conf);
+      fs.delete(outputVectorTmpPath, true);
+    }
+    return mean;
+  }
+
   public DistributedRowMatrix transpose() throws IOException {
     Path outputPath = new Path(rowPath.getParent(), "transpose-" + (System.nanoTime() & 0xFF));
     Configuration initialConf = getConf() == null ? new Configuration() : getConf();
@@ -207,7 +242,7 @@ public class DistributedRowMatrix implem
       Path outputVectorTmpPath = new Path(outputTmpBasePath,
                                           new Path(Long.toString(System.nanoTime())));
       Configuration conf =
-          TimesSquaredJob.createTimesJobConf(initialConf, 
+          TimesSquaredJob.createTimesJobConf(initialConf,
                                              v,
                                              numRows,
                                              rowPath,
@@ -325,7 +360,7 @@ public class DistributedRowMatrix implem
       col = in.readInt();
       val = in.readDouble();
     }
-    
+
     @Override
     public String toString() {
       return "(" + row + ',' + col + "):" + val;

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java?rev=1292532&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java Wed Feb 22 21:57:27 2012
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import com.google.common.io.Closeables;
+
+/**
+ * MatrixColumnMeansJob is a job for calculating the column-wise mean of a
+ * DistributedRowMatrix. This job can be accessed using
+ * DistributedRowMatrix.columnMeans()
+ */
+public class MatrixColumnMeansJob {
+
+  public static final String VECTOR_CLASS =
+    "DistributedRowMatrix.columnMeans.vector.class";
+
+  public static Vector run(Configuration conf,
+                           Path inputPath,
+                           Path outputVectorTmpPath) throws IOException {
+    return run(conf, inputPath, outputVectorTmpPath);
+  }
+
+  /**
+   * Job for calculating column-wise mean of a DistributedRowMatrix
+   *
+   * @param initialConf
+   * @param inputPath
+   *          path to DistributedRowMatrix input
+   * @param tmpPath
+   *          path for temporary files created during job
+   * @param vectorClass
+   *          String of desired class for returned vector e.g. DenseVector,
+   *          RandomAccessSparseVector (may be null for {@link DenseVector} )
+   * @return Vector containing column-wise mean of DistributedRowMatrix
+   */
+  public static Vector run(Configuration initialConf,
+                           Path inputPath,
+                           Path outputVectorTmpPath,
+                           String vectorClass) throws IOException {
+
+    try {
+      initialConf.set(VECTOR_CLASS,
+                      vectorClass == null ? DenseVector.class.getName()
+                          : vectorClass);
+
+      @SuppressWarnings("deprecation")
+      JobConf oldApiConf = new JobConf(initialConf);
+
+      org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(oldApiConf,
+                                                              outputVectorTmpPath);
+      Job job = new Job(initialConf);
+      outputVectorTmpPath.getFileSystem(job.getConfiguration())
+                         .delete(outputVectorTmpPath, true);
+      job.setNumReduceTasks(1);
+      FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+      FileInputFormat.addInputPath(job, inputPath);
+      job.setInputFormatClass(SequenceFileInputFormat.class);
+      job.setOutputFormatClass(SequenceFileOutputFormat.class);
+      FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+
+      job.setMapperClass(MatrixColumnMeansMapper.class);
+      job.setReducerClass(MatrixColumnMeansReducer.class);
+      job.setMapOutputKeyClass(NullWritable.class);
+      job.setMapOutputValueClass(VectorWritable.class);
+      job.setOutputKeyClass(IntWritable.class);
+      job.setOutputValueClass(VectorWritable.class);
+      job.submit();
+      job.waitForCompletion(true);
+
+      Path tmpFile = new Path(outputVectorTmpPath, "part-r-00000");
+      SequenceFileValueIterator<VectorWritable> iterator =
+        new SequenceFileValueIterator<VectorWritable>(tmpFile, true, oldApiConf);
+      try {
+        if (iterator.hasNext()) {
+          return iterator.next().get();
+        } else {
+          return (Vector) Class.forName(vectorClass).getConstructor(int.class)
+                               .newInstance(0);
+        }
+      } finally {
+        Closeables.closeQuietly(iterator);
+      }
+    } catch (Throwable thr) {
+      if (thr instanceof IOException)
+        throw (IOException) thr;
+      else
+        throw new IOException(thr);
+    }
+  }
+
+  /**
+   * Mapper for calculation of column-wise mean.
+   */
+  public static class MatrixColumnMeansMapper extends
+      Mapper<IntWritable, VectorWritable, NullWritable, VectorWritable> {
+
+    private Vector runningSum = null;
+    private String vectorClass = null;
+
+    @Override
+    public void setup(Context context) {
+      vectorClass = context.getConfiguration().get(VECTOR_CLASS);
+    }
+
+    /**
+     * The mapper computes a running sum of the vectors the task has seen.
+     * Element 0 of the running sum vector contains a count of the number of
+     * vectors that have been seen. The remaining elements contain the
+     * column-wise running sum. Nothing is written at this stage
+     */
+    @Override
+    public void map(IntWritable r, VectorWritable v, Context context)
+      throws IOException {
+      if (runningSum == null) {
+        try {
+          /*
+           * If this is the first vector the mapper has seen, instantiate a new
+           * vector using the parameter VECTOR_CLASS
+           */
+          runningSum =
+            (Vector) Class.forName(vectorClass).getConstructor(int.class)
+                          .newInstance(v.get().size() + 1);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+        runningSum.set(0, 1);
+        runningSum.viewPart(1, v.get().size()).assign(v.get());
+      } else {
+        runningSum.set(0, runningSum.get(0) + 1);
+        runningSum.viewPart(1, v.get().size()).assign(v.get(), Functions.PLUS);
+      }
+    }
+
+    /**
+     * The column-wise sum is written at the cleanup stage. A single reducer is
+     * forced so null can be used for the key
+     */
+    @Override
+    public void cleanup(Context context) throws InterruptedException,
+      IOException {
+      if (runningSum != null) {
+        context.write(NullWritable.get(), new VectorWritable(runningSum));
+      }
+    }
+
+  }
+
+  /**
+   * The reducer adds the partial column-wise sums from each of the mappers to
+   * compute the total column-wise sum. The total sum is then divided by the
+   * total count of vectors to determine the column-wise mean.
+   */
+  public static class MatrixColumnMeansReducer extends
+      Reducer<NullWritable, VectorWritable, IntWritable, VectorWritable> {
+
+    private static final IntWritable one = new IntWritable(1);
+    private String vectorClass = null;
+    Vector outputVector = null;
+    VectorWritable outputVectorWritable = new VectorWritable();
+
+    @Override
+    public void setup(Context context) {
+      vectorClass = context.getConfiguration().get(VECTOR_CLASS);
+    }
+
+    @Override
+    public void reduce(NullWritable n,
+                       Iterable<VectorWritable> vectors,
+                       Context context) throws IOException,
+      InterruptedException {
+
+      /**
+       * Add together partial column-wise sums from mappers
+       */
+      for (VectorWritable v : vectors) {
+        if (outputVector == null) {
+          outputVector = v.get();
+        } else {
+          outputVector.assign(v.get(), Functions.PLUS);
+        }
+      }
+
+      /**
+       * Divide total column-wise sum by count of vectors, which corresponds to
+       * the number of rows in the DistributedRowMatrix
+       */
+      if (outputVector != null) {
+        outputVectorWritable.set(outputVector.viewPart(1,
+                                                       outputVector.size() - 1)
+                                             .divide(outputVector.get(0)));
+        context.write(one, outputVectorWritable);
+      } else {
+        try {
+          Vector emptyVector =
+            (Vector) Class.forName(vectorClass).getConstructor(int.class)
+                          .newInstance(0);
+          context.write(one, new VectorWritable(emptyVector));
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java?rev=1292532&r1=1292531&r2=1292532&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java Wed Feb 22 21:57:27 2012
@@ -54,6 +54,7 @@ import org.apache.mahout.math.DenseVecto
 import org.apache.mahout.math.SequentialAccessSparseVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
 import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRFirstStep;
 
 /**
@@ -65,6 +66,9 @@ public class ABtDenseOutJob {
 
   public static final String PROP_BT_PATH = "ssvd.Bt.path";
   public static final String PROP_BT_BROADCAST = "ssvd.Bt.broadcast";
+  public static final String PROP_SB_PATH = "ssvdpca.sb.path";
+  public static final String PROP_SQ_PATH = "ssvdpca.sq.path";
+  public static final String PROP_XI_PATH = "ssvdpca.xi.path";
 
   private ABtDenseOutJob() {
   }
@@ -99,6 +103,11 @@ public class ABtDenseOutJob {
     private boolean distributedBt;
     private Path[] btLocalPath;
     private Configuration localFsConfig;
+    /*
+     * xi and s_q are PCA-related corrections, per MAHOUT-817
+     */
+    protected Vector xi;
+    protected Vector sq;
 
     @Override
     protected void map(Writable key, VectorWritable value, Context context)
@@ -257,8 +266,26 @@ public class ABtDenseOutJob {
               /*
                * assume btVec is dense
                */
-              for (int s = 0; s < kp; s++) {
-                yiCols[s][j - aRowBegin] += aEl.get() * btVec.getQuick(s);
+              if (xi != null) {
+                /*
+                 * MAHOUT-817: PCA correction for B'. I rewrite the whole
+                 * computation loop so i don't have to check if PCA correction
+                 * is needed at individual element level. It looks bulkier this
+                 * way but perhaps less wasteful on cpu.
+                 */
+                for (int s = 0; s < kp; s++) {
+                  // code defensively against shortened xi
+                  double xii = xi.size() > btIndex ? xi.get(btIndex) : 0.0;
+                  yiCols[s][j - aRowBegin] +=
+                    aEl.get() * (btVec.getQuick(s) - xii * sq.get(s));
+                }
+              } else {
+                /*
+                 * no PCA correction
+                 */
+                for (int s = 0; s < kp; s++) {
+                  yiCols[s][j - aRowBegin] += aEl.get() * btVec.getQuick(s);
+                }
               }
 
             }
@@ -287,27 +314,31 @@ public class ABtDenseOutJob {
     protected void setup(Context context) throws IOException,
       InterruptedException {
 
-      int k =
-        Integer.parseInt(context.getConfiguration().get(QRFirstStep.PROP_K));
-      int p =
-        Integer.parseInt(context.getConfiguration().get(QRFirstStep.PROP_P));
+      Configuration conf = context.getConfiguration();
+      int k = Integer.parseInt(conf.get(QRFirstStep.PROP_K));
+      int p = Integer.parseInt(conf.get(QRFirstStep.PROP_P));
       kp = k + p;
 
       outKey = new SplitPartitionedWritable(context);
 
-      blockHeight =
-        context.getConfiguration().getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT,
-                                          -1);
-      distributedBt = context.getConfiguration().get(PROP_BT_BROADCAST) != null;
+      blockHeight = conf.getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT, -1);
+      distributedBt = conf.get(PROP_BT_BROADCAST) != null;
       if (distributedBt) {
-
-        btLocalPath =
-          DistributedCache.getLocalCacheFiles(context.getConfiguration());
-
+        btLocalPath = DistributedCache.getLocalCacheFiles(conf);
         localFsConfig = new Configuration();
         localFsConfig.set("fs.default.name", "file:///");
       }
 
+      /*
+       * PCA -related corrections (MAHOUT-817)
+       */
+      String xiPathStr = conf.get(PROP_XI_PATH);
+      if (xiPathStr != null) {
+        xi = SSVDHelper.loadAndSumUpVectors(new Path(xiPathStr), conf);
+        sq =
+          SSVDHelper.loadAndSumUpVectors(new Path(conf.get(PROP_SQ_PATH)), conf);
+      }
+
     }
   }
 
@@ -346,14 +377,21 @@ public class ABtDenseOutJob {
     protected OutputCollector<Writable, VectorWritable> rhatCollector;
     protected QRFirstStep qr;
     protected Vector yiRow;
+    protected Vector sb;
 
     @Override
     protected void setup(Context context) throws IOException,
       InterruptedException {
-      blockHeight =
-        context.getConfiguration().getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT,
-                                          -1);
-
+      Configuration conf = context.getConfiguration();
+      blockHeight = conf.getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT, -1);
+      String sbPathStr = conf.get(PROP_SB_PATH);
+
+      /*
+       * PCA -related corrections (MAHOUT-817)
+       */
+      if (sbPathStr != null) {
+        sb = SSVDHelper.loadAndSumUpVectors(new Path(sbPathStr), conf);
+      }
     }
 
     protected void setupBlock(Context context, SplitPartitionedWritable spw)
@@ -407,6 +445,12 @@ public class ABtDenseOutJob {
         }
 
         key.setTaskItemOrdinal(blockBase + k);
+
+        // pca offset correction if any
+        if (sb != null) {
+          yiRow.assign(sb, Functions.MINUS);
+        }
+
         qr.collect(key, yiRow);
       }
 
@@ -466,6 +510,9 @@ public class ABtDenseOutJob {
   public static void run(Configuration conf,
                          Path[] inputAPaths,
                          Path inputBtGlob,
+                         Path xiPath,
+                         Path sqPath,
+                         Path sbPath,
                          Path outputPath,
                          int aBlockRows,
                          int minSplitSize,
@@ -509,6 +556,15 @@ public class ABtDenseOutJob {
     job.getConfiguration().setInt(QRFirstStep.PROP_P, p);
     job.getConfiguration().set(PROP_BT_PATH, inputBtGlob.toString());
 
+    /*
+     * PCA-related options, MAHOUT-817
+     */
+    if (xiPath != null) {
+      job.getConfiguration().set(PROP_XI_PATH, xiPath.toString());
+      job.getConfiguration().set(PROP_SB_PATH, sbPath.toString());
+      job.getConfiguration().set(PROP_SQ_PATH, sqPath.toString());
+    }
+
     job.setNumReduceTasks(numReduceTasks);
 
     // broadcast Bt files if required.

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1292532&r1=1292531&r2=1292532&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java Wed Feb 22 21:57:27 2012
@@ -51,6 +51,8 @@ import org.apache.mahout.common.iterator
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.function.PlusMult;
 import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRLastStep;
 
 /**
@@ -80,6 +82,9 @@ public final class BtJob {
   public static final String OUTPUT_Q = "Q";
   public static final String OUTPUT_BT = "part";
   public static final String OUTPUT_BBT = "bbt";
+  public static final String OUTPUT_SQ = "sq";
+  public static final String OUTPUT_SB = "sb";
+
   public static final String PROP_QJOB_PATH = "ssvd.QJob.path";
   public static final String PROP_OUPTUT_BBT_PRODUCTS =
     "ssvd.BtJob.outputBBtProducts";
@@ -87,6 +92,8 @@ public final class BtJob {
     "ssvd.outerProdBlockHeight";
   public static final String PROP_RHAT_BROADCAST = "ssvd.rhat.broadcast";
 
+  public static final String PROP_XI_PATH = "ssvdpca.xi.path";
+
   static final double SPARSE_ZEROS_PCT_THRESHOLD = 0.1;
 
   private BtJob() {
@@ -105,16 +112,9 @@ public final class BtJob {
     private SparseRowBlockAccumulator btCollector;
     private Context mapContext;
 
-    @Override
-    protected void cleanup(Context context) throws IOException,
-      InterruptedException {
-      IOUtils.close(closeables);
-    }
-
-    @SuppressWarnings("unchecked")
-    private void outputQRow(Writable key, Writable value) throws IOException {
-      outputs.getCollector(OUTPUT_Q, null).collect(key, value);
-    }
+    // pca stuff
+    private Vector sqAccum;
+    private boolean computeSq;
 
     /**
      * We maintain A and QtHat inputs partitioned the same way, so we
@@ -136,6 +136,14 @@ public final class BtJob {
       // make sure Qs are inheriting A row labels.
       outputQRow(key, qRowValue);
 
+      // MAHOUT-817
+      if (computeSq) {
+        if (sqAccum == null) {
+          sqAccum = new DenseVector(kp);
+        }
+        sqAccum.assign(qRow, Functions.PLUS);
+      }
+
       if (btRow == null) {
         btRow = new DenseVector(kp);
       }
@@ -166,7 +174,9 @@ public final class BtJob {
       InterruptedException {
       super.setup(context);
 
-      Path qJobPath = new Path(context.getConfiguration().get(PROP_QJOB_PATH));
+      Configuration conf = context.getConfiguration();
+
+      Path qJobPath = new Path(conf.get(PROP_QJOB_PATH));
 
       /*
        * actually this is kind of dangerous because this routine thinks we need
@@ -182,37 +192,35 @@ public final class BtJob {
       SequenceFileValueIterator<DenseBlockWritable> qhatInput =
         new SequenceFileValueIterator<DenseBlockWritable>(qInputPath,
                                                           true,
-                                                          context.getConfiguration());
+                                                          conf);
       closeables.addFirst(qhatInput);
 
       /*
        * read all r files _in order of task ids_, i.e. partitions (aka group
        * nums).
-       * 
+       *
        * Note: if broadcast option is used, this comes from distributed cache
        * files rather than hdfs path.
        */
 
       SequenceFileDirValueIterator<VectorWritable> rhatInput;
 
-      boolean distributedRHat =
-        context.getConfiguration().get(PROP_RHAT_BROADCAST) != null;
+      boolean distributedRHat = conf.get(PROP_RHAT_BROADCAST) != null;
       if (distributedRHat) {
 
-        Path[] rFiles =
-          DistributedCache.getLocalCacheFiles(context.getConfiguration());
+        Path[] rFiles = DistributedCache.getLocalCacheFiles(conf);
 
         Validate.notNull(rFiles,
                          "no RHat files in distributed cache job definition");
 
-        Configuration conf = new Configuration();
-        conf.set("fs.default.name", "file:///");
+        Configuration lconf = new Configuration();
+        lconf.set("fs.default.name", "file:///");
 
         rhatInput =
           new SequenceFileDirValueIterator<VectorWritable>(rFiles,
-                                                           SSVDSolver.PARTITION_COMPARATOR,
+                                                           SSVDHelper.PARTITION_COMPARATOR,
                                                            true,
-                                                           conf);
+                                                           lconf);
 
       } else {
         Path rPath = new Path(qJobPath, QJob.OUTPUT_RHAT + "-*");
@@ -220,15 +228,15 @@ public final class BtJob {
           new SequenceFileDirValueIterator<VectorWritable>(rPath,
                                                            PathType.GLOB,
                                                            null,
-                                                           SSVDSolver.PARTITION_COMPARATOR,
+                                                           SSVDHelper.PARTITION_COMPARATOR,
                                                            true,
-                                                           context.getConfiguration());
+                                                           conf);
       }
 
       Validate.isTrue(rhatInput.hasNext(), "Empty R-hat input!");
 
       closeables.addFirst(rhatInput);
-      outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
+      outputs = new MultipleOutputs(new JobConf(conf));
       closeables.addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs));
 
       qr = new QRLastStep(qhatInput, rhatInput, blockNum);
@@ -258,12 +266,36 @@ public final class BtJob {
         };
 
       btCollector =
-        new SparseRowBlockAccumulator(context.getConfiguration()
-                                             .getInt(PROP_OUTER_PROD_BLOCK_HEIGHT,
-                                                     -1),
-                                      btBlockCollector);
+        new SparseRowBlockAccumulator(conf.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT,
+                                                  -1), btBlockCollector);
       closeables.addFirst(btCollector);
 
+      // MAHOUT-817
+      computeSq = (conf.get(PROP_XI_PATH) != null);
+
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+      try {
+        if (sqAccum != null) {
+          /*
+           * hack: we will output sq partial sums with index -1 for summation.
+           */
+          SparseRowBlockWritable sbrw = new SparseRowBlockWritable(1);
+          sbrw.plusRow(0, sqAccum);
+          LongWritable lw = new LongWritable(-1);
+          context.write(lw, sbrw);
+        }
+      } finally {
+        IOUtils.close(closeables);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void outputQRow(Writable key, Writable value) throws IOException {
+      outputs.getCollector(OUTPUT_Q, null).collect(key, value);
     }
   }
 
@@ -316,28 +348,40 @@ public final class BtJob {
     private final IntWritable btKey = new IntWritable();
     private final VectorWritable btValue = new VectorWritable();
 
+    // MAHOUT-817
+    private Vector xi;
+    private final PlusMult pmult = new PlusMult(0);
+    private Vector sbAccum;
+
     @Override
     protected void setup(Context context) throws IOException,
       InterruptedException {
 
-      blockHeight =
-        context.getConfiguration().getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1);
+      Configuration conf = context.getConfiguration();
+      blockHeight = conf.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1);
 
-      outputBBt =
-        context.getConfiguration().getBoolean(PROP_OUPTUT_BBT_PRODUCTS, false);
+      outputBBt = conf.getBoolean(PROP_OUPTUT_BBT_PRODUCTS, false);
 
       if (outputBBt) {
-        int k = context.getConfiguration().getInt(QJob.PROP_K, -1);
-        int p = context.getConfiguration().getInt(QJob.PROP_P, -1);
+        int k = conf.getInt(QJob.PROP_K, -1);
+        int p = conf.getInt(QJob.PROP_P, -1);
 
         Validate.isTrue(k > 0, "invalid k parameter");
         Validate.isTrue(p >= 0, "invalid p parameter");
         mBBt = new UpperTriangular(k + p);
 
-        outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
-        closeables.addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs));
+      }
 
+      String xiPathStr = conf.get(PROP_XI_PATH);
+      if (xiPathStr != null) {
+        xi = SSVDHelper.loadAndSumUpVectors(new Path(xiPathStr), conf);
       }
+
+      if (outputBBt || xi != null) {
+        outputs = new MultipleOutputs(new JobConf(conf));
+        closeables.addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs));
+      }
+
     }
 
     @Override
@@ -351,6 +395,19 @@ public final class BtJob {
         accum.plusBlock(bw);
       }
 
+      // MAHOUT-817:
+      if (key.get() == -1L) {
+
+        Vector sq = accum.getRows()[0];
+
+        @SuppressWarnings("unchecked")
+        OutputCollector<IntWritable, VectorWritable> sqOut =
+          outputs.getCollector(OUTPUT_SQ, null);
+
+        sqOut.collect(new IntWritable(0), new VectorWritable(sq));
+        return;
+      }
+
       /*
        * at this point, sum of rows should be in accum, so we just generate
        * outer self product of it and add to BBt accumulator.
@@ -378,6 +435,18 @@ public final class BtJob {
           }
         }
 
+        // MAHOUT-817
+        if (xi != null) {
+          // code defensively against shortened xi
+          int btIndex = btKey.get();
+          double xii = xi.size() > btIndex ? xi.getQuick(btIndex) : 0.0;
+          // compute s_b
+          pmult.setMultiplicator(xii);
+          if (sbAccum == null)
+            sbAccum = new DenseVector(btRow.size());
+          sbAccum.assign(btRow, pmult);
+        }
+
       }
     }
 
@@ -396,17 +465,27 @@ public final class BtJob {
           collector.collect(new IntWritable(),
                             new VectorWritable(new DenseVector(mBBt.getData())));
         }
+
+        // MAHOUT-817
+        if (sbAccum != null) {
+          @SuppressWarnings("unchecked")
+          OutputCollector<IntWritable, VectorWritable> collector =
+            outputs.getCollector(OUTPUT_SB, null);
+
+          collector.collect(new IntWritable(), new VectorWritable(sbAccum));
+
+        }
       } finally {
         IOUtils.close(closeables);
       }
 
     }
-
   }
 
   public static void run(Configuration conf,
                          Path[] inputPathA,
                          Path inputPathQJob,
+                         Path xiPath,
                          Path outputPath,
                          int minSplitSize,
                          int k,
@@ -433,6 +512,19 @@ public final class BtJob {
                                      IntWritable.class,
                                      VectorWritable.class);
     }
+    if (xiPath != null) {
+      // compute pca -related stuff as well
+      MultipleOutputs.addNamedOutput(oldApiJob,
+                                     OUTPUT_SQ,
+                                     org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                     IntWritable.class,
+                                     VectorWritable.class);
+      MultipleOutputs.addNamedOutput(oldApiJob,
+                                     OUTPUT_SB,
+                                     org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                     IntWritable.class,
+                                     VectorWritable.class);
+    }
 
     /*
      * HACK: we use old api multiple outputs since they are not available in the
@@ -479,6 +571,13 @@ public final class BtJob {
     job.setNumReduceTasks(numReduceTasks);
 
     /*
+     * PCA-related options, MAHOUT-817
+     */
+    if (xiPath != null) {
+      job.getConfiguration().set(PROP_XI_PATH, xiPath.toString());
+    }
+
+    /*
      * we can broadhast Rhat files since all of them are reuqired by each job,
      * but not Q files which correspond to splits of A (so each split of A will
      * require only particular Q file, each time different one).

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java?rev=1292532&r1=1292531&r2=1292532&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java Wed Feb 22 21:57:27 2012
@@ -17,9 +17,19 @@
 
 package org.apache.mahout.math.hadoop.stochasticsvd;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.Vector.Element;
 
@@ -33,14 +43,14 @@ public class Omega {
   private final long seed;
   private final int kp;
 
-  public Omega(long seed, int k, int p) {
+  public Omega(long seed, int kp) {
     this.seed = seed;
-    kp = k + p;
+    this.kp = kp;
   }
 
   /**
    * Get omega element at (x,y) uniformly distributed within [-1...1)
-   * 
+   *
    * @param row
    *          omega row
    * @param column
@@ -98,6 +108,68 @@ public class Omega {
     }
   }
 
+  /*
+   * computes t(Omega) %*% v in multithreaded fashion
+   */
+  public Vector mutlithreadedTRightMultiply(final Vector v) {
+
+    int nThreads = Runtime.getRuntime().availableProcessors();
+    ExecutorService es =
+      new ThreadPoolExecutor(nThreads,
+                             nThreads,
+                             1,
+                             TimeUnit.SECONDS,
+                             new ArrayBlockingQueue<Runnable>(kp));
+
+    try {
+
+      List<Future<Double>> dotFutures = new ArrayList<Future<Double>>(kp);
+
+      for (int i = 0; i < kp; i++) {
+        final int index = i;
+
+        Future<Double> dotFuture = es.submit(new Callable<Double>() {
+          @Override
+          public Double call() throws Exception {
+            double result = 0.0;
+            if (v.isDense()) {
+              for (int k = 0; k < v.size(); k++)
+                // it's ok, this is reentrant
+                result += getQuick(k, index) * v.getQuick(k);
+
+            } else {
+              for (Iterator<Vector.Element> iter = v.iterateNonZero(); iter.hasNext();) {
+                Vector.Element el = iter.next();
+                int k = el.index();
+                result += getQuick(k, index) * el.get();
+              }
+            }
+            return result;
+          }
+        });
+        dotFutures.add(dotFuture);
+      }
+
+      try {
+        Vector res = new DenseVector(kp);
+        for (int i = 0; i < kp; i++) {
+          res.setQuick(i, dotFutures.get(i).get());
+        }
+        return res;
+      } catch (InterruptedException exc) {
+        throw new RuntimeException("Interrupted", exc);
+      } catch (ExecutionException exc) {
+        if (exc.getCause() instanceof RuntimeException)
+          throw (RuntimeException) exc.getCause();
+        else
+          throw new RuntimeException(exc.getCause());
+      }
+
+    } finally {
+      es.shutdown();
+    }
+  }
+
   protected void accumDots(int aIndex, double aElement, double[] yRow) {
     for (int i = 0; i < kp; i++) {
       yRow[i] += getQuick(aIndex, i) * aElement;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java?rev=1292532&r1=1292531&r2=1292532&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java Wed Feb 22 21:57:27 2012
@@ -40,6 +40,7 @@ import org.apache.mahout.common.IOUtils;
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
 import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRFirstStep;
 
 /**
@@ -60,7 +61,9 @@ public final class QJob {
   public static final String PROP_OMEGA_SEED = "ssvd.omegaseed";
   public static final String PROP_K = QRFirstStep.PROP_K;
   public static final String PROP_P = QRFirstStep.PROP_P;
-  public static final String PROP_AROWBLOCK_SIZE = QRFirstStep.PROP_AROWBLOCK_SIZE;
+  public static final String PROP_SB_PATH = "ssvdpca.sb.path";
+  public static final String PROP_AROWBLOCK_SIZE =
+    QRFirstStep.PROP_AROWBLOCK_SIZE;
 
   public static final String OUTPUT_RHAT = "R";
   public static final String OUTPUT_QHAT = "QHat";
@@ -77,23 +80,29 @@ public final class QJob {
     private SplitPartitionedWritable qHatKey;
     private SplitPartitionedWritable rHatKey;
     private Vector yRow;
+    private Vector sb;
     private Omega omega;
     private int kp;
 
-
     private QRFirstStep qr;
 
     @Override
     protected void setup(Context context) throws IOException,
       InterruptedException {
-      
-      int k = Integer.parseInt(context.getConfiguration().get(PROP_K));
-      int p = Integer.parseInt(context.getConfiguration().get(PROP_P));
+
+      Configuration conf = context.getConfiguration();
+      int k = Integer.parseInt(conf.get(PROP_K));
+      int p = Integer.parseInt(conf.get(PROP_P));
       kp = k + p;
-      long omegaSeed = Long.parseLong(context.getConfiguration().get(PROP_OMEGA_SEED));
-      omega = new Omega(omegaSeed, k, p);
+      long omegaSeed = Long.parseLong(conf.get(PROP_OMEGA_SEED));
+      omega = new Omega(omegaSeed, k + p);
+
+      String sbPathStr = conf.get(PROP_SB_PATH);
+      if (sbPathStr != null) {
+        sb = SSVDHelper.loadAndSumUpVectors(new Path(sbPathStr), conf);
+      }
 
-      outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
+      outputs = new MultipleOutputs(new JobConf(conf));
       closeables.addFirst(new Closeable() {
         @Override
         public void close() throws IOException {
@@ -103,6 +112,7 @@ public final class QJob {
 
       qHatKey = new SplitPartitionedWritable(context);
       rHatKey = new SplitPartitionedWritable(context);
+
       OutputCollector<Writable, DenseBlockWritable> qhatCollector =
         new OutputCollector<Writable, DenseBlockWritable>() {
 
@@ -114,6 +124,7 @@ public final class QJob {
             qHatKey.incrementItemOrdinal();
           }
         };
+
       OutputCollector<Writable, VectorWritable> rhatCollector =
         new OutputCollector<Writable, VectorWritable>() {
 
@@ -126,31 +137,31 @@ public final class QJob {
           }
         };
 
-      qr =
-        new QRFirstStep(context.getConfiguration(),
-                        qhatCollector,
-                        rhatCollector);
+      qr = new QRFirstStep(conf, qhatCollector, rhatCollector);
       closeables.addFirst(qr);// important: qr closes first!!
-      yRow=new DenseVector(kp);
+      yRow = new DenseVector(kp);
     }
-    
+
     @Override
     protected void map(Writable key, VectorWritable value, Context context)
       throws IOException, InterruptedException {
       omega.computeYRow(value.get(), yRow);
+      if (sb != null) {
+        yRow.assign(sb, Functions.MINUS);
+      }
       qr.collect(key, yRow);
     }
 
-
-
     @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
+    protected void cleanup(Context context) throws IOException,
+      InterruptedException {
       IOUtils.close(closeables);
     }
   }
 
   public static void run(Configuration conf,
                          Path[] inputPaths,
+                         Path sbPath,
                          Path outputPath,
                          int aBlockRows,
                          int minSplitSize,
@@ -161,18 +172,16 @@ public final class QJob {
     InterruptedException, IOException {
 
     JobConf oldApiJob = new JobConf(conf);
-    MultipleOutputs
-      .addNamedOutput(oldApiJob,
-                      OUTPUT_QHAT,
-                      org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
-                      SplitPartitionedWritable.class,
-                      DenseBlockWritable.class);
-    MultipleOutputs
-      .addNamedOutput(oldApiJob,
-                      OUTPUT_RHAT,
-                      org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
-                      SplitPartitionedWritable.class,
-                      VectorWritable.class);
+    MultipleOutputs.addNamedOutput(oldApiJob,
+                                   OUTPUT_QHAT,
+                                   org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                   SplitPartitionedWritable.class,
+                                   DenseBlockWritable.class);
+    MultipleOutputs.addNamedOutput(oldApiJob,
+                                   OUTPUT_RHAT,
+                                   org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                   SplitPartitionedWritable.class,
+                                   VectorWritable.class);
 
     Job job = new Job(oldApiJob);
     job.setJobName("Q-job");
@@ -203,6 +212,9 @@ public final class QJob {
     job.getConfiguration().setLong(PROP_OMEGA_SEED, seed);
     job.getConfiguration().setInt(PROP_K, k);
     job.getConfiguration().setInt(PROP_P, p);
+    if (sbPath != null) {
+      job.getConfiguration().set(PROP_SB_PATH, sbPath.toString());
+    }
 
     /*
      * number of reduce tasks doesn't matter. we don't actually send anything to

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java?rev=1292532&r1=1292531&r2=1292532&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java Wed Feb 22 21:57:27 2012
@@ -17,23 +17,18 @@
 package org.apache.mahout.math.hadoop.stochasticsvd;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.apache.mahout.math.DenseVector;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.MatrixColumnMeansJob;
 
 /**
  * Mahout CLI adapter for SSVDSolver
@@ -81,6 +76,13 @@ public class SSVDCli extends AbstractJob
               "br",
               "whether use distributed cache to broadcast matrices wherever possible",
               String.valueOf(true));
+    addOption("pca",
+              "pca",
+              "run in pca mode: compute column-wise mean and subtract from input",
+              String.valueOf(false));
+    addOption("pcaOffset",
+              "xi",
+              "path(glob) of external pca mean (optional, dont compute, use external mean");
     addOption(DefaultOptionCreator.overwriteOption().create());
 
     Map<String, List<String>> pargs = parseArguments(args);
@@ -101,6 +103,10 @@ public class SSVDCli extends AbstractJob
     boolean cVHalfSigma = Boolean.parseBoolean(getOption("vHalfSigma"));
     int reduceTasks = Integer.parseInt(getOption("reduceTasks"));
     boolean broadcast = Boolean.parseBoolean(getOption("broadcast"));
+    String xiPathStr = getOption("pcaOffset");
+    Path xiPath = xiPathStr == null ? null : new Path(xiPathStr);
+    boolean pca = Boolean.parseBoolean(getOption("pca")) || xiPath != null;
+
     boolean overwrite =
       pargs.containsKey(keyFor(DefaultOptionCreator.OVERWRITE_OPTION));
 
@@ -109,14 +115,17 @@ public class SSVDCli extends AbstractJob
       throw new IOException("No Hadoop configuration present");
     }
 
+    Path[] inputPaths = new Path[] { getInputPath() };
+
+    // MAHOUT-817
+    if (pca && xiPath == null) {
+      xiPath = new Path(getTempPath(), "xi");
+      MatrixColumnMeansJob.run(conf, inputPaths[0], getTempPath());
+    }
+
     SSVDSolver solver =
-      new SSVDSolver(conf,
-                     new Path[] { getInputPath() },
-                     getTempPath(),
-                     r,
-                     k,
-                     p,
-                     reduceTasks);
+      new SSVDSolver(conf, inputPaths, getTempPath(), r, k, p, reduceTasks);
+
     solver.setMinSplitSize(minSplitSize);
     solver.setComputeU(computeU);
     solver.setComputeV(computeV);
@@ -127,6 +136,7 @@ public class SSVDCli extends AbstractJob
     solver.setQ(q);
     solver.setBroadcast(broadcast);
     solver.setOverwrite(overwrite);
+    solver.setPcaMeanPath(xiPath);
 
     solver.run();
 
@@ -135,24 +145,8 @@ public class SSVDCli extends AbstractJob
 
     fs.mkdirs(getOutputPath());
 
-    SequenceFile.Writer sigmaW = null;
-
-    try {
-      sigmaW =
-        SequenceFile.createWriter(fs,
-                                  conf,
-                                  getOutputPath("sigma"),
-                                  NullWritable.class,
-                                  VectorWritable.class);
-      Writable sValues =
-        new VectorWritable(new DenseVector(Arrays.copyOf(solver.getSingularValues(),
-                                                         k),
-                                           true));
-      sigmaW.append(NullWritable.get(), sValues);
-
-    } finally {
-      Closeables.closeQuietly(sigmaW);
-    }
+    Vector svalues = solver.getSingularValues().viewPart(0, k);
+    SSVDHelper.saveVector(svalues, getOutputPath("sigma"), conf);
 
     if (computeU) {
       FileStatus[] uFiles = fs.globStatus(new Path(solver.getUPath()));
@@ -169,7 +163,6 @@ public class SSVDCli extends AbstractJob
           fs.rename(vf.getPath(), getOutputPath());
         }
       }
-
     }
     return 0;
   }

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java?rev=1292532&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java Wed Feb 22 21:57:27 2012
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+/**
+ * set of small file manipulation helpers.
+ *
+ */
+
+public class SSVDHelper {
+
+  /**
+   * load single vector from an hdfs file (possibly presented as glob).
+   */
+  static Vector loadVector(Path glob, Configuration conf) throws IOException {
+
+    SequenceFileDirValueIterator<VectorWritable> iter =
+      new SequenceFileDirValueIterator<VectorWritable>(glob,
+                                                       PathType.GLOB,
+                                                       null,
+                                                       null,
+                                                       true,
+                                                       conf);
+
+    try {
+      if (!iter.hasNext())
+        throw new IOException("Empty input while reading vector");
+      VectorWritable vw = iter.next();
+
+      if (iter.hasNext())
+        throw new IOException("Unexpected data after the end of vector file");
+
+      return vw.get();
+
+    } finally {
+      Closeables.closeQuietly(iter);
+    }
+  }
+
+  /**
+   * save single vector into hdfs file.
+   *
+   * @param v
+   *          vector to save
+   * @param vectorFilePath
+   * @param conf
+   * @throws IOException
+   */
+  public static void saveVector(Vector v,
+                                Path vectorFilePath,
+                                Configuration conf) throws IOException {
+    VectorWritable vw = new VectorWritable(v);
+    FileSystem fs = FileSystem.get(conf);
+    SequenceFile.Writer w =
+      new SequenceFile.Writer(fs,
+                              conf,
+                              vectorFilePath,
+                              IntWritable.class,
+                              VectorWritable.class);
+    try {
+      w.append(new IntWritable(), vw);
+    } finally {
+      /*
+       * this is a writer, no quiet close please. we must bail out on incomplete
+       * close.
+       */
+      w.close();
+    }
+  }
+
+  /**
+   * sniff label type in the input files
+   */
+  static Class<? extends Writable> sniffInputLabelType(Path[] inputPath,
+                                                       Configuration conf)
+    throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    for (Path p : inputPath) {
+      FileStatus[] fstats = fs.globStatus(p);
+      if (fstats == null || fstats.length == 0) {
+        continue;
+      }
+
+      FileStatus firstSeqFile;
+      if (!fstats[0].isDir()) {
+        firstSeqFile = fstats[0];
+      } else {
+        firstSeqFile =
+          fs.listStatus(fstats[0].getPath(), PathFilters.logsCRCFilter())[0];
+      }
+
+      SequenceFile.Reader r = null;
+      try {
+        r = new SequenceFile.Reader(fs, firstSeqFile.getPath(), conf);
+        return r.getKeyClass().asSubclass(Writable.class);
+      } finally {
+        Closeables.closeQuietly(r);
+      }
+    }
+    throw new IOException("Unable to open input files to determine input label type.");
+  }
+
+  private static final Pattern OUTPUT_FILE_PATTERN =
+    Pattern.compile("(\\w+)-(m|r)-(\\d+)(\\.\\w+)?");
+
+  static final Comparator<FileStatus> PARTITION_COMPARATOR =
+    new Comparator<FileStatus>() {
+      private final Matcher matcher = OUTPUT_FILE_PATTERN.matcher("");
+
+      @Override
+      public int compare(FileStatus o1, FileStatus o2) {
+        matcher.reset(o1.getPath().getName());
+        if (!matcher.matches()) {
+          throw new IllegalArgumentException("Unexpected file name, unable to deduce partition #:"
+              + o1.getPath());
+        }
+        int p1 = Integer.parseInt(matcher.group(3));
+        matcher.reset(o2.getPath().getName());
+        if (!matcher.matches()) {
+          throw new IllegalArgumentException("Unexpected file name, unable to deduce partition #:"
+              + o2.getPath());
+        }
+
+        int p2 = Integer.parseInt(matcher.group(3));
+        return p1 - p2;
+      }
+
+    };
+
+  /**
+   * helper capabiltiy to load distributed row matrices into dense matrix (to
+   * support tests mainly).
+   *
+   * @param fs
+   *          filesystem
+   * @param glob
+   *          FS glob
+   * @param conf
+   *          configuration
+   * @return Dense matrix array
+   * @throws IOException
+   *           when I/O occurs.
+   */
+  public static double[][] loadDistributedRowMatrix(FileSystem fs,
+                                                    Path glob,
+                                                    Configuration conf)
+    throws IOException {
+
+    FileStatus[] files = fs.globStatus(glob);
+    if (files == null) {
+      return null;
+    }
+
+    List<double[]> denseData = Lists.newArrayList();
+
+    /*
+     * assume it is partitioned output, so we need to read them up in order of
+     * partitions.
+     */
+    Arrays.sort(files, PARTITION_COMPARATOR);
+
+    for (FileStatus fstat : files) {
+      for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(fstat.getPath(),
+                                                                                true,
+                                                                                conf)) {
+        Vector v = value.get();
+        int size = v.size();
+        double[] row = new double[size];
+        for (int i = 0; i < size; i++) {
+          row[i] = v.get(i);
+        }
+        // ignore row label.
+        denseData.add(row);
+      }
+    }
+
+    return denseData.toArray(new double[denseData.size()][]);
+  }
+
+  /**
+   * Load multiplel upper triangular matrices and sum them up.
+   *
+   * @param fs
+   * @param glob
+   * @param conf
+   * @return the sum of upper triangular inputs.
+   * @throws IOException
+   */
+  public static UpperTriangular
+      loadAndSumUpperTriangularMatrices(Path glob, Configuration conf)
+        throws IOException {
+    Vector v = loadAndSumUpVectors(glob, conf);
+    return v == null ? null : new UpperTriangular(v);
+  }
+
+  /**
+   * returns sum of all vectors in different files specified by glob
+   *
+   * @param glob
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  public static Vector loadAndSumUpVectors(Path glob, Configuration conf)
+    throws IOException {
+
+    SequenceFileDirValueIterator<VectorWritable> iter =
+      new SequenceFileDirValueIterator<VectorWritable>(glob,
+                                                       PathType.GLOB,
+                                                       null,
+                                                       PARTITION_COMPARATOR,
+                                                       true,
+                                                       conf);
+
+    try {
+      Vector v = null;
+      while (iter.hasNext()) {
+        if (v == null)
+          v = new DenseVector(iter.next().get());
+        else
+          v.assign(iter.next().get(), Functions.PLUS);
+      }
+      return v;
+
+    } finally {
+      Closeables.closeQuietly(iter);
+    }
+
+  }
+
+  /**
+   * Load only one upper triangular matrix and issue error if mroe than one is
+   * found.
+   *
+   * @param fs
+   * @param glob
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  public static UpperTriangular loadUpperTriangularMatrix(FileSystem fs,
+                                                          Path glob,
+                                                          Configuration conf)
+    throws IOException {
+
+    /*
+     * there still may be more than one file in glob and only one of them must
+     * contain the matrix.
+     */
+
+    SequenceFileDirValueIterator<VectorWritable> iter =
+      new SequenceFileDirValueIterator<VectorWritable>(glob,
+                                                       PathType.GLOB,
+                                                       null,
+                                                       null,
+                                                       true,
+                                                       conf);
+    try {
+      if (!iter.hasNext())
+        throw new IOException("No triangular matrices found");
+      Vector v = iter.next().get();
+      UpperTriangular result = new UpperTriangular(v);
+      if (iter.hasNext())
+        throw new IOException("Unexpected overrun in upper triangular matrix files");
+      return result;
+
+    } finally {
+      iter.close();
+    }
+  }
+
+  /**
+   * extracts row-wise raw data from a Mahout matrix for 3rd party solvers.
+   * Unfortunately values member is 100% encapsulated in {@link DenseMatrix} at
+   * this point, so we have to resort to abstract element-wise copying.
+   *
+   * @param m
+   * @return
+   */
+  public static double[][] extractRawData(Matrix m) {
+    int rows = m.numRows();
+    int cols = m.numCols();
+    double[][] result = new double[rows][];
+    for (int i = 0; i < rows; i++) {
+      result[i] = new double[cols];
+      for (int j = 0; j < cols; j++) {
+        result[i][j] = m.getQuick(i, j);
+      }
+    }
+    return result;
+  }
+
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java?rev=1292532&r1=1292531&r2=1292532&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java Wed Feb 22 21:57:27 2012
@@ -19,33 +19,24 @@ package org.apache.mahout.math.hadoop.st
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Deque;
-import java.util.List;
 import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Writable;
 import org.apache.mahout.common.IOUtils;
 import org.apache.mahout.common.RandomUtils;
-import org.apache.mahout.common.iterator.sequencefile.PathFilters;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.DenseMatrix;
 import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.DistributedRowMatrixWriter;
+import org.apache.mahout.math.Matrix;
 import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
 import org.apache.mahout.math.ssvd.EigenSolverWrapper;
 
 import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
 
 /**
  * Stochastic SVD solver (API class).
@@ -96,7 +87,7 @@ import com.google.common.io.Closeables;
  */
 public class SSVDSolver {
 
-  private double[] svalues;
+  private Vector svalues;
   private boolean computeU = true;
   private boolean computeV = true;
   private String uPath;
@@ -118,6 +109,7 @@ public class SSVDSolver {
   private boolean cVHalfSigma;
   private boolean overwrite;
   private boolean broadcast = true;
+  private Path pcaMeanPath;
 
   /**
    * create new SSVD solver. Required parameters are passed to constructor to
@@ -216,7 +208,7 @@ public class SSVDSolver {
    * 
    * @return singlular values (largest to smallest)
    */
-  public double[] getSingularValues() {
+  public Vector getSingularValues() {
     return svalues;
   }
 
@@ -300,6 +292,30 @@ public class SSVDSolver {
   }
 
   /**
+   * Optional. Single-vector file path for a vector (aka xi in MAHOUT-817
+   * working notes) to be subtracted from each row of input.
+   * <P>
+   * 
+   * Brute force approach would force would turn input into a dense input, which
+   * is often not very desirable. By supplying this offset to SSVD solver, we
+   * can avoid most of that overhead due to increased input density.
+   * <P>
+   * 
+   * The vector size for this offest is n (width of A input). In PCA and R this
+   * is known as "column means", but in this case it can be any offset of row
+   * vectors of course to propagate into SSVD solution.
+   * <P>
+   * 
+   */
+  public Path getPcaMeanPath() {
+    return pcaMeanPath;
+  }
+
+  public void setPcaMeanPath(Path pcaMeanPath) {
+    this.pcaMeanPath = pcaMeanPath;
+  }
+
+  /**
    * run all SSVD jobs.
    * 
    * @throws IOException
@@ -310,8 +326,8 @@ public class SSVDSolver {
     Deque<Closeable> closeables = Lists.newLinkedList();
     try {
       Class<? extends Writable> labelType =
-        sniffInputLabelType(inputPath, conf);
-      FileSystem fs = FileSystem.get(outputPath.toUri(), conf);
+        SSVDHelper.sniffInputLabelType(inputPath, conf);
+      FileSystem fs = FileSystem.get(conf);
 
       Path qPath = new Path(outputPath, "Q-job");
       Path btPath = new Path(outputPath, "Bt-job");
@@ -320,15 +336,47 @@ public class SSVDSolver {
       Path uPath = new Path(outputPath, "U");
       Path vPath = new Path(outputPath, "V");
 
+      Path pcaBasePath = new Path(outputPath, "pca");
+      Path sbPath = null;
+      Path sqPath = null;
+
+      double xisquaredlen = 0;
+
+      if (pcaMeanPath != null)
+        fs.mkdirs(pcaBasePath);
+      Random rnd = RandomUtils.getRandom();
+      long seed = rnd.nextLong();
+
+      if (pcaMeanPath != null) {
+        /*
+         * combute s_b0 if pca offset present.
+         * 
+         * Just in case, we treat xi path as a possible reduce or otherwise
+         * multiple task output that we assume we need to sum up partial
+         * components. If it is just one file, it will work too.
+         */
+
+        Vector xi = SSVDHelper.loadAndSumUpVectors(pcaMeanPath, conf);
+        xisquaredlen = xi.dot(xi);
+        Omega omega = new Omega(seed, k + p);
+        Vector s_b0 = omega.mutlithreadedTRightMultiply(xi);
+
+        SSVDHelper.saveVector(s_b0, sbPath =
+          new Path(pcaBasePath, "somega.seq"), conf);
+      }
+
       if (overwrite) {
         fs.delete(outputPath, true);
       }
 
-      Random rnd = RandomUtils.getRandom();
-      long seed = rnd.nextLong();
+      /*
+       * if we work with pca offset, we need to precompute s_bq0 aka s_omega for
+       * jobs to use.
+       */
 
       QJob.run(conf,
                inputPath,
+               sbPath,
                qPath,
                ablockRows,
                minSplitSize,
@@ -344,9 +392,13 @@ public class SSVDSolver {
        * bit too many (I would be happy i that were ever the case though).
        */
 
+      sbPath = new Path(pcaBasePath, "sb0");
+      sqPath = new Path(pcaBasePath, "sq0");
+
       BtJob.run(conf,
                 inputPath,
                 qPath,
+                pcaMeanPath,
                 btPath,
                 minSplitSize,
                 k,
@@ -357,6 +409,9 @@ public class SSVDSolver {
                 labelType,
                 q <= 0);
 
+      sbPath = new Path(btPath, BtJob.OUTPUT_SB + "-*");
+      sqPath = new Path(btPath, BtJob.OUTPUT_SQ + "-*");
+
       // power iterations
       for (int i = 0; i < q; i++) {
 
@@ -365,6 +420,9 @@ public class SSVDSolver {
         ABtDenseOutJob.run(conf,
                            inputPath,
                            btPathGlob,
+                           pcaMeanPath,
+                           sqPath,
+                           sbPath,
                            qPath,
                            ablockRows,
                            minSplitSize,
@@ -379,6 +437,7 @@ public class SSVDSolver {
         BtJob.run(conf,
                   inputPath,
                   qPath,
+                  pcaMeanPath,
                   btPath,
                   minSplitSize,
                   k,
@@ -388,103 +447,92 @@ public class SSVDSolver {
                   broadcast,
                   labelType,
                   i == q - 1);
+        sbPath = new Path(btPath, BtJob.OUTPUT_SB + "-*");
+        sqPath = new Path(btPath, BtJob.OUTPUT_SQ + "-*");
       }
 
-      UpperTriangular bbt =
-        loadAndSumUpperTriangularMatrices(fs, new Path(btPath, BtJob.OUTPUT_BBT
-            + "-*"), conf);
+      UpperTriangular bbtTriangular =
+        SSVDHelper.loadAndSumUpperTriangularMatrices(new Path(btPath,
+                                                              BtJob.OUTPUT_BBT
+                                                                  + "-*"), conf);
 
       // convert bbt to something our eigensolver could understand
-      assert bbt.columnSize() == k + p;
+      assert bbtTriangular.columnSize() == k + p;
 
-      double[][] bbtSquare = new double[k + p][];
-      for (int i = 0; i < k + p; i++) {
-        bbtSquare[i] = new double[k + p];
-      }
+      /*
+       * we currently use a 3rd party in-core eigensolver. So we need just a
+       * dense array representation for it.
+       */
+      DenseMatrix bbtSquare = new DenseMatrix(k+p,k+p);
 
       for (int i = 0; i < k + p; i++) {
         for (int j = i; j < k + p; j++) {
-          bbtSquare[i][j] = bbtSquare[j][i] = bbt.getQuick(i, j);
+          double val = bbtTriangular.getQuick(i, j);
+          bbtSquare.setQuick(i, j, val);
+          bbtSquare.setQuick(j, i, val);
         }
       }
+      
+      // MAHOUT-817
+      if (pcaMeanPath != null) {
+        Vector sq = SSVDHelper.loadAndSumUpVectors(sqPath, conf);
+        Vector sb = SSVDHelper.loadAndSumUpVectors(sbPath, conf);
+        Matrix mC = sq.cross(sb);
 
-      svalues = new double[k + p];
+        bbtSquare.assign(mC, Functions.MINUS);
+        bbtSquare.assign(mC.transpose(), Functions.MINUS);
+        mC = null;
 
-      // try something else.
-      EigenSolverWrapper eigenWrapper = new EigenSolverWrapper(bbtSquare);
+        Matrix outerSq = sq.cross(sq);
+        outerSq.assign(Functions.mult(xisquaredlen));
+        bbtSquare.assign(outerSq, Functions.PLUS);
 
-      double[] eigenva2 = eigenWrapper.getEigenValues();
-      for (int i = 0; i < k + p; i++) {
-        svalues[i] = Math.sqrt(eigenva2[i]); // sqrt?
-      }
-
-      // save/redistribute UHat
-      double[][] uHat = eigenWrapper.getUHat();
-
-      fs.mkdirs(uHatPath);
-      SequenceFile.Writer uHatWriter =
-        SequenceFile.createWriter(fs,
-                                  conf,
-                                  uHatPath = new Path(uHatPath, "uhat.seq"),
-                                  IntWritable.class,
-                                  VectorWritable.class,
-                                  CompressionType.BLOCK);
-      closeables.addFirst(uHatWriter);
-
-      int m = uHat.length;
-      IntWritable iw = new IntWritable();
-      VectorWritable vw = new VectorWritable();
-      for (int i = 0; i < m; i++) {
-        vw.set(new DenseVector(uHat[i], true));
-        iw.set(i);
-        uHatWriter.append(iw, vw);
       }
 
-      closeables.remove(uHatWriter);
-      Closeables.closeQuietly(uHatWriter);
+      EigenSolverWrapper eigenWrapper = new EigenSolverWrapper(SSVDHelper.extractRawData(bbtSquare));
+      Matrix uHat = new DenseMatrix(eigenWrapper.getUHat());
+      svalues = new DenseVector(eigenWrapper.getEigenValues());
 
-      SequenceFile.Writer svWriter =
-        SequenceFile.createWriter(fs,
-                                  conf,
-                                  svPath = new Path(svPath, "svalues.seq"),
-                                  IntWritable.class,
-                                  VectorWritable.class,
-                                  CompressionType.BLOCK);
+      svalues.assign(Functions.SQRT);
 
-      closeables.addFirst(svWriter);
-
-      vw.set(new DenseVector(svalues, true));
-      svWriter.append(iw, vw);
+      // save/redistribute UHat
+      fs.mkdirs(uHatPath);
+      DistributedRowMatrixWriter.write(uHatPath =
+        new Path(uHatPath, "uhat.seq"), conf, uHat);
 
-      closeables.remove(svWriter);
-      Closeables.closeQuietly(svWriter);
+      // save sigma.
+      SSVDHelper.saveVector(svalues,
+                            svPath = new Path(svPath, "svalues.seq"),
+                            conf);
 
       UJob ujob = null;
       if (computeU) {
         ujob = new UJob();
-        ujob.start(conf,
-                   new Path(btPath, BtJob.OUTPUT_Q + "-*"),
-                   uHatPath,
-                   svPath,
-                   uPath,
-                   k,
-                   reduceTasks,
-                   labelType,
-                   cUHalfSigma);
+        ujob.run(conf,
+                 new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+                 uHatPath,
+                 svPath,
+                 uPath,
+                 k,
+                 reduceTasks,
+                 labelType,
+                 cUHalfSigma);
         // actually this is map-only job anyway
       }
 
       VJob vjob = null;
       if (computeV) {
         vjob = new VJob();
-        vjob.start(conf,
-                   new Path(btPath, BtJob.OUTPUT_BT + "-*"),
-                   uHatPath,
-                   svPath,
-                   vPath,
-                   k,
-                   reduceTasks,
-                   cVHalfSigma);
+        vjob.run(conf,
+                 new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+                 pcaMeanPath,
+                 sqPath,
+                 uHatPath,
+                 svPath,
+                 vPath,
+                 k,
+                 reduceTasks,
+                 cVHalfSigma);
       }
 
       if (ujob != null) {
@@ -506,195 +554,4 @@ public class SSVDSolver {
     }
 
   }
-
-  private static Class<? extends Writable>
-      sniffInputLabelType(Path[] inputPath, Configuration conf)
-        throws IOException {
-    FileSystem fs = FileSystem.get(inputPath[0].toUri(), conf);
-    for (Path p : inputPath) {
-      FileStatus[] fstats = fs.globStatus(p);
-      if (fstats == null || fstats.length == 0) {
-        continue;
-      }
-
-      FileStatus firstSeqFile;
-      if (fstats[0].isDir()) {
-        firstSeqFile = fs.listStatus(fstats[0].getPath(), PathFilters.logsCRCFilter())[0];
-      } else {
-        firstSeqFile = fstats[0];
-      }
-
-      SequenceFile.Reader r = null;
-      try {
-        r = new SequenceFile.Reader(fs, firstSeqFile.getPath(), conf);
-        return r.getKeyClass().asSubclass(Writable.class);
-      } finally {
-        Closeables.closeQuietly(r);
-      }
-    }
-    throw new IOException("Unable to open input files to determine input label type.");
-  }
-
-  private static final Pattern OUTPUT_FILE_PATTERN =
-    Pattern.compile("(\\w+)-(m|r)-(\\d+)(\\.\\w+)?");
-
-  static final Comparator<FileStatus> PARTITION_COMPARATOR =
-    new Comparator<FileStatus>() {
-      private final Matcher matcher = OUTPUT_FILE_PATTERN.matcher("");
-
-      @Override
-      public int compare(FileStatus o1, FileStatus o2) {
-        matcher.reset(o1.getPath().getName());
-        if (!matcher.matches()) {
-          throw new IllegalArgumentException("Unexpected file name, unable to deduce partition #:"
-              + o1.getPath());
-        }
-        int p1 = Integer.parseInt(matcher.group(3));
-        matcher.reset(o2.getPath().getName());
-        if (!matcher.matches()) {
-          throw new IllegalArgumentException("Unexpected file name, unable to deduce partition #:"
-              + o2.getPath());
-        }
-
-        int p2 = Integer.parseInt(matcher.group(3));
-        return p1 - p2;
-      }
-
-    };
-
-  /**
-   * helper capabiltiy to load distributed row matrices into dense matrix (to
-   * support tests mainly).
-   * 
-   * @param fs
-   *          filesystem
-   * @param glob
-   *          FS glob
-   * @param conf
-   *          configuration
-   * @return Dense matrix array
-   * @throws IOException
-   *           when I/O occurs.
-   */
-  public static double[][] loadDistributedRowMatrix(FileSystem fs,
-                                                    Path glob,
-                                                    Configuration conf)
-    throws IOException {
-
-    FileStatus[] files = fs.globStatus(glob);
-    if (files == null) {
-      return null;
-    }
-
-    List<double[]> denseData = Lists.newArrayList();
-
-    /*
-     * assume it is partitioned output, so we need to read them up in order of
-     * partitions.
-     */
-    Arrays.sort(files, PARTITION_COMPARATOR);
-
-    for (FileStatus fstat : files) {
-      for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(fstat.getPath(),
-                                                                                true,
-                                                                                conf)) {
-        Vector v = value.get();
-        int size = v.size();
-        double[] row = new double[size];
-        for (int i = 0; i < size; i++) {
-          row[i] = v.get(i);
-        }
-        // ignore row label.
-        denseData.add(row);
-      }
-    }
-
-    return denseData.toArray(new double[denseData.size()][]);
-  }
-
-  /**
-   * Load multiplel upper triangular matrices and sum them up.
-   * 
-   * @param fs
-   * @param glob
-   * @param conf
-   * @return the sum of upper triangular inputs.
-   * @throws IOException
-   */
-  public static UpperTriangular
-      loadAndSumUpperTriangularMatrices(FileSystem fs,
-                                        Path glob,
-                                        Configuration conf) throws IOException {
-
-    FileStatus[] files = fs.globStatus(glob);
-    if (files == null) {
-      return null;
-    }
-
-    /*
-     * assume it is partitioned output, so we need to read them up in order of
-     * partitions.
-     */
-    Arrays.sort(files, PARTITION_COMPARATOR);
-
-    DenseVector result = null;
-    for (FileStatus fstat : files) {
-      for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(fstat.getPath(),
-                                                                                true,
-                                                                                conf)) {
-        Vector v = value.get();
-        if (result == null) {
-          result = new DenseVector(v);
-        } else {
-          result.addAll(v);
-        }
-      }
-    }
-
-    if (result == null) {
-      throw new IOException("Unexpected underrun in upper triangular matrix files");
-    }
-    return new UpperTriangular(result);
-  }
-
-  /**
-   * Load only one upper triangular matrix and issue error if mroe than one is
-   * found.
-   */
-  public static UpperTriangular loadUpperTriangularMatrix(FileSystem fs,
-                                                          Path glob,
-                                                          Configuration conf)
-    throws IOException {
-
-    FileStatus[] files = fs.globStatus(glob);
-    if (files == null) {
-      return null;
-    }
-
-    /*
-     * assume it is partitioned output, so we need to read them up in order of
-     * partitions.
-     */
-    Arrays.sort(files, PARTITION_COMPARATOR);
-
-    UpperTriangular result = null;
-    for (FileStatus fstat : files) {
-      for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(fstat.getPath(),
-                                                                                true,
-                                                                                conf)) {
-        Vector v = value.get();
-        if (result == null) {
-          result = new UpperTriangular(v);
-        } else {
-          throw new IOException("Unexpected overrun in upper triangular matrix files");
-        }
-      }
-    }
-
-    if (result == null) {
-      throw new IOException("Unexpected underrun in upper triangular matrix files");
-    }
-    return result;
-  }
-
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java?rev=1292532&r1=1292531&r2=1292532&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java Wed Feb 22 21:57:27 2012
@@ -37,6 +37,7 @@ import org.apache.mahout.math.DenseVecto
 import org.apache.mahout.math.Matrix;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
 
 /**
  * Computes U=Q*Uhat of SSVD (optionally adding x pow(Sigma, 0.5) )
@@ -51,7 +52,7 @@ public class UJob {
 
   private Job job;
 
-  public void start(Configuration conf, Path inputPathQ, Path inputUHatPath,
+  public void run(Configuration conf, Path inputPathQ, Path inputUHatPath,
       Path sigmaPath, Path outputPath, int k, int numReduceTasks,
       Class<? extends Writable> labelClass, boolean uHalfSigma)
     throws ClassNotFoundException, InterruptedException, IOException {
@@ -135,7 +136,7 @@ public class UJob {
       Path sigmaPath = new Path(context.getConfiguration().get(PROP_SIGMA_PATH));
       FileSystem fs = FileSystem.get(uHatPath.toUri(), context.getConfiguration());
 
-      uHat = new DenseMatrix(SSVDSolver.loadDistributedRowMatrix(fs,
+      uHat = new DenseMatrix(SSVDHelper.loadDistributedRowMatrix(fs,
           uHatPath, context.getConfiguration()));
       // since uHat is (k+p) x (k+p)
       kp = uHat.columnSize();
@@ -144,11 +145,8 @@ public class UJob {
       uRowWritable = new VectorWritable(uRow);
 
       if (context.getConfiguration().get(PROP_U_HALFSIGMA) != null) {
-        sValues = new DenseVector(SSVDSolver.loadDistributedRowMatrix(fs,
-            sigmaPath, context.getConfiguration())[0], true);
-        for (int i = 0; i < k; i++) {
-          sValues.setQuick(i, Math.sqrt(sValues.getQuick(i)));
-        }
+        sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration());
+        sValues.assign(Functions.SQRT);
       }
 
     }



Mime
View raw message