Return-Path: X-Original-To: apmail-mahout-commits-archive@www.apache.org Delivered-To: apmail-mahout-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CFA4D9FD6 for ; Wed, 22 Feb 2012 21:57:52 +0000 (UTC) Received: (qmail 31644 invoked by uid 500); 22 Feb 2012 21:57:52 -0000 Delivered-To: apmail-mahout-commits-archive@mahout.apache.org Received: (qmail 31567 invoked by uid 500); 22 Feb 2012 21:57:52 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 31543 invoked by uid 99); 22 Feb 2012 21:57:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Feb 2012 21:57:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Feb 2012 21:57:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E3C342388900; Wed, 22 Feb 2012 21:57:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1292532 [1/2] - in /mahout/trunk/core/src: main/java/org/apache/mahout/math/hadoop/ main/java/org/apache/mahout/math/hadoop/stochasticsvd/ test/java/org/apache/mahout/math/hadoop/ test/java/org/apache/mahout/math/hadoop/stochasticsvd/ Date: Wed, 22 Feb 2012 21:57:28 -0000 To: commits@mahout.apache.org From: dlyubimov@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120222215728.E3C342388900@eris.apache.org> Author: dlyubimov Date: Wed Feb 22 21:57:27 2012 New Revision: 1292532 URL: http://svn.apache.org/viewvc?rev=1292532&view=rev Log: MAHOUT-817 PCA options for SSVD (RC1) Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDPCADenseTest.java - copied, changed from r1245615, mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCommonTest.java - copied, changed from r1245615, mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototypeTest.java Removed: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototypeTest.java Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/YtYJob.java mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDTestsHelper.java Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java?rev=1292532&r1=1292531&r2=1292532&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java Wed Feb 22 21:57:27 2012 @@ -17,8 +17,12 @@ package org.apache.mahout.math.hadoop; -import com.google.common.base.Function; -import com.google.common.collect.Iterators; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -39,10 +43,8 @@ import org.apache.mahout.math.VectorWrit import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Iterator; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; /** * DistributedRowMatrix is a FileSystem-backed VectorIterable in which the vectors live in a @@ -64,7 +66,7 @@ import java.util.Iterator; */ public class DistributedRowMatrix implements VectorIterable, Configurable { public static final String KEEP_TEMP_FILES = "DistributedMatrix.keep.temp.files"; - + private static final Logger log = LoggerFactory.getLogger(DistributedRowMatrix.class); private final Path inputPath; @@ -176,13 +178,13 @@ public class DistributedRowMatrix implem throw new CardinalityException(numRows, other.numRows()); } Path outPath = new Path(outputTmpBasePath.getParent(), "productWith-" + (System.nanoTime() & 0xFF)); - + Configuration initialConf = getConf() == null ? new Configuration() : getConf(); Configuration conf = - MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf, - rowPath, - other.rowPath, - outPath, + MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf, + rowPath, + other.rowPath, + outPath, other.numCols); JobClient.runJob(new JobConf(conf)); DistributedRowMatrix out = new DistributedRowMatrix(outPath, outputTmpPath, numCols, other.numCols()); @@ -190,6 +192,39 @@ public class DistributedRowMatrix implem return out; } + public Vector columnMeans() throws IOException, InterruptedException, ClassNotFoundException, + IllegalArgumentException, SecurityException, InstantiationException, IllegalAccessException, + InvocationTargetException, NoSuchMethodException { + return columnMeans("SequentialAccessSparseVector"); + } + + /** + * Returns the column-wise mean of a DistributedRowMatrix + * + * @param vectorClass + * desired class for the column-wise mean vector e.g. + * RandomAccessSparseVector, DenseVector + * @return Vector containing the column-wise mean of this + */ + public Vector columnMeans(String vectorClass) throws IOException, + InterruptedException, IllegalArgumentException, SecurityException, + ClassNotFoundException, InstantiationException, IllegalAccessException, + InvocationTargetException, NoSuchMethodException { + Path outputVectorTmpPath = + new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime()))); + Configuration initialConf = + getConf() == null ? new Configuration() : getConf(); + String vectorClassFull = "org.apache.mahout.math." + vectorClass; + Vector mean = + MatrixColumnMeansJob.run(initialConf, rowPath, outputVectorTmpPath, + vectorClassFull); + if (!keepTempFiles) { + FileSystem fs = outputVectorTmpPath.getFileSystem(conf); + fs.delete(outputVectorTmpPath, true); + } + return mean; + } + public DistributedRowMatrix transpose() throws IOException { Path outputPath = new Path(rowPath.getParent(), "transpose-" + (System.nanoTime() & 0xFF)); Configuration initialConf = getConf() == null ? new Configuration() : getConf(); @@ -207,7 +242,7 @@ public class DistributedRowMatrix implem Path outputVectorTmpPath = new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime()))); Configuration conf = - TimesSquaredJob.createTimesJobConf(initialConf, + TimesSquaredJob.createTimesJobConf(initialConf, v, numRows, rowPath, @@ -325,7 +360,7 @@ public class DistributedRowMatrix implem col = in.readInt(); val = in.readDouble(); } - + @Override public String toString() { return "(" + row + ',' + col + "):" + val; Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java?rev=1292532&view=auto ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java (added) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java Wed Feb 22 21:57:27 2012 @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.mahout.math.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; + +import com.google.common.io.Closeables; + +/** + * MatrixColumnMeansJob is a job for calculating the column-wise mean of a + * DistributedRowMatrix. This job can be accessed using + * DistributedRowMatrix.columnMeans() + */ +public class MatrixColumnMeansJob { + + public static final String VECTOR_CLASS = + "DistributedRowMatrix.columnMeans.vector.class"; + + public static Vector run(Configuration conf, + Path inputPath, + Path outputVectorTmpPath) throws IOException { + return run(conf, inputPath, outputVectorTmpPath); + } + + /** + * Job for calculating column-wise mean of a DistributedRowMatrix + * + * @param initialConf + * @param inputPath + * path to DistributedRowMatrix input + * @param tmpPath + * path for temporary files created during job + * @param vectorClass + * String of desired class for returned vector e.g. DenseVector, + * RandomAccessSparseVector (may be null for {@link DenseVector} ) + * @return Vector containing column-wise mean of DistributedRowMatrix + */ + public static Vector run(Configuration initialConf, + Path inputPath, + Path outputVectorTmpPath, + String vectorClass) throws IOException { + + try { + initialConf.set(VECTOR_CLASS, + vectorClass == null ? DenseVector.class.getName() + : vectorClass); + + @SuppressWarnings("deprecation") + JobConf oldApiConf = new JobConf(initialConf); + + org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(oldApiConf, + outputVectorTmpPath); + Job job = new Job(initialConf); + outputVectorTmpPath.getFileSystem(job.getConfiguration()) + .delete(outputVectorTmpPath, true); + job.setNumReduceTasks(1); + FileOutputFormat.setOutputPath(job, outputVectorTmpPath); + FileInputFormat.addInputPath(job, inputPath); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(job, outputVectorTmpPath); + + job.setMapperClass(MatrixColumnMeansMapper.class); + job.setReducerClass(MatrixColumnMeansReducer.class); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(VectorWritable.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(VectorWritable.class); + job.submit(); + job.waitForCompletion(true); + + Path tmpFile = new Path(outputVectorTmpPath, "part-r-00000"); + SequenceFileValueIterator iterator = + new SequenceFileValueIterator(tmpFile, true, oldApiConf); + try { + if (iterator.hasNext()) { + return iterator.next().get(); + } else { + return (Vector) Class.forName(vectorClass).getConstructor(int.class) + .newInstance(0); + } + } finally { + Closeables.closeQuietly(iterator); + } + } catch (Throwable thr) { + if (thr instanceof IOException) + throw (IOException) thr; + else + throw new IOException(thr); + } + } + + /** + * Mapper for calculation of column-wise mean. + */ + public static class MatrixColumnMeansMapper extends + Mapper { + + private Vector runningSum = null; + private String vectorClass = null; + + @Override + public void setup(Context context) { + vectorClass = context.getConfiguration().get(VECTOR_CLASS); + } + + /** + * The mapper computes a running sum of the vectors the task has seen. + * Element 0 of the running sum vector contains a count of the number of + * vectors that have been seen. The remaining elements contain the + * column-wise running sum. Nothing is written at this stage + */ + @Override + public void map(IntWritable r, VectorWritable v, Context context) + throws IOException { + if (runningSum == null) { + try { + /* + * If this is the first vector the mapper has seen, instantiate a new + * vector using the parameter VECTOR_CLASS + */ + runningSum = + (Vector) Class.forName(vectorClass).getConstructor(int.class) + .newInstance(v.get().size() + 1); + } catch (Exception e) { + e.printStackTrace(); + } + runningSum.set(0, 1); + runningSum.viewPart(1, v.get().size()).assign(v.get()); + } else { + runningSum.set(0, runningSum.get(0) + 1); + runningSum.viewPart(1, v.get().size()).assign(v.get(), Functions.PLUS); + } + } + + /** + * The column-wise sum is written at the cleanup stage. A single reducer is + * forced so null can be used for the key + */ + @Override + public void cleanup(Context context) throws InterruptedException, + IOException { + if (runningSum != null) { + context.write(NullWritable.get(), new VectorWritable(runningSum)); + } + } + + } + + /** + * The reducer adds the partial column-wise sums from each of the mappers to + * compute the total column-wise sum. The total sum is then divided by the + * total count of vectors to determine the column-wise mean. + */ + public static class MatrixColumnMeansReducer extends + Reducer { + + private static final IntWritable one = new IntWritable(1); + private String vectorClass = null; + Vector outputVector = null; + VectorWritable outputVectorWritable = new VectorWritable(); + + @Override + public void setup(Context context) { + vectorClass = context.getConfiguration().get(VECTOR_CLASS); + } + + @Override + public void reduce(NullWritable n, + Iterable vectors, + Context context) throws IOException, + InterruptedException { + + /** + * Add together partial column-wise sums from mappers + */ + for (VectorWritable v : vectors) { + if (outputVector == null) { + outputVector = v.get(); + } else { + outputVector.assign(v.get(), Functions.PLUS); + } + } + + /** + * Divide total column-wise sum by count of vectors, which corresponds to + * the number of rows in the DistributedRowMatrix + */ + if (outputVector != null) { + outputVectorWritable.set(outputVector.viewPart(1, + outputVector.size() - 1) + .divide(outputVector.get(0))); + context.write(one, outputVectorWritable); + } else { + try { + Vector emptyVector = + (Vector) Class.forName(vectorClass).getConstructor(int.class) + .newInstance(0); + context.write(one, new VectorWritable(emptyVector)); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + +} Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java?rev=1292532&r1=1292531&r2=1292532&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java Wed Feb 22 21:57:27 2012 @@ -54,6 +54,7 @@ import org.apache.mahout.math.DenseVecto import org.apache.mahout.math.SequentialAccessSparseVector; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRFirstStep; /** @@ -65,6 +66,9 @@ public class ABtDenseOutJob { public static final String PROP_BT_PATH = "ssvd.Bt.path"; public static final String PROP_BT_BROADCAST = "ssvd.Bt.broadcast"; + public static final String PROP_SB_PATH = "ssvdpca.sb.path"; + public static final String PROP_SQ_PATH = "ssvdpca.sq.path"; + public static final String PROP_XI_PATH = "ssvdpca.xi.path"; private ABtDenseOutJob() { } @@ -99,6 +103,11 @@ public class ABtDenseOutJob { private boolean distributedBt; private Path[] btLocalPath; private Configuration localFsConfig; + /* + * xi and s_q are PCA-related corrections, per MAHOUT-817 + */ + protected Vector xi; + protected Vector sq; @Override protected void map(Writable key, VectorWritable value, Context context) @@ -257,8 +266,26 @@ public class ABtDenseOutJob { /* * assume btVec is dense */ - for (int s = 0; s < kp; s++) { - yiCols[s][j - aRowBegin] += aEl.get() * btVec.getQuick(s); + if (xi != null) { + /* + * MAHOUT-817: PCA correction for B'. I rewrite the whole + * computation loop so i don't have to check if PCA correction + * is needed at individual element level. It looks bulkier this + * way but perhaps less wasteful on cpu. + */ + for (int s = 0; s < kp; s++) { + // code defensively against shortened xi + double xii = xi.size() > btIndex ? xi.get(btIndex) : 0.0; + yiCols[s][j - aRowBegin] += + aEl.get() * (btVec.getQuick(s) - xii * sq.get(s)); + } + } else { + /* + * no PCA correction + */ + for (int s = 0; s < kp; s++) { + yiCols[s][j - aRowBegin] += aEl.get() * btVec.getQuick(s); + } } } @@ -287,27 +314,31 @@ public class ABtDenseOutJob { protected void setup(Context context) throws IOException, InterruptedException { - int k = - Integer.parseInt(context.getConfiguration().get(QRFirstStep.PROP_K)); - int p = - Integer.parseInt(context.getConfiguration().get(QRFirstStep.PROP_P)); + Configuration conf = context.getConfiguration(); + int k = Integer.parseInt(conf.get(QRFirstStep.PROP_K)); + int p = Integer.parseInt(conf.get(QRFirstStep.PROP_P)); kp = k + p; outKey = new SplitPartitionedWritable(context); - blockHeight = - context.getConfiguration().getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT, - -1); - distributedBt = context.getConfiguration().get(PROP_BT_BROADCAST) != null; + blockHeight = conf.getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT, -1); + distributedBt = conf.get(PROP_BT_BROADCAST) != null; if (distributedBt) { - - btLocalPath = - DistributedCache.getLocalCacheFiles(context.getConfiguration()); - + btLocalPath = DistributedCache.getLocalCacheFiles(conf); localFsConfig = new Configuration(); localFsConfig.set("fs.default.name", "file:///"); } + /* + * PCA -related corrections (MAHOUT-817) + */ + String xiPathStr = conf.get(PROP_XI_PATH); + if (xiPathStr != null) { + xi = SSVDHelper.loadAndSumUpVectors(new Path(xiPathStr), conf); + sq = + SSVDHelper.loadAndSumUpVectors(new Path(conf.get(PROP_SQ_PATH)), conf); + } + } } @@ -346,14 +377,21 @@ public class ABtDenseOutJob { protected OutputCollector rhatCollector; protected QRFirstStep qr; protected Vector yiRow; + protected Vector sb; @Override protected void setup(Context context) throws IOException, InterruptedException { - blockHeight = - context.getConfiguration().getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT, - -1); - + Configuration conf = context.getConfiguration(); + blockHeight = conf.getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT, -1); + String sbPathStr = conf.get(PROP_SB_PATH); + + /* + * PCA -related corrections (MAHOUT-817) + */ + if (sbPathStr != null) { + sb = SSVDHelper.loadAndSumUpVectors(new Path(sbPathStr), conf); + } } protected void setupBlock(Context context, SplitPartitionedWritable spw) @@ -407,6 +445,12 @@ public class ABtDenseOutJob { } key.setTaskItemOrdinal(blockBase + k); + + // pca offset correction if any + if (sb != null) { + yiRow.assign(sb, Functions.MINUS); + } + qr.collect(key, yiRow); } @@ -466,6 +510,9 @@ public class ABtDenseOutJob { public static void run(Configuration conf, Path[] inputAPaths, Path inputBtGlob, + Path xiPath, + Path sqPath, + Path sbPath, Path outputPath, int aBlockRows, int minSplitSize, @@ -509,6 +556,15 @@ public class ABtDenseOutJob { job.getConfiguration().setInt(QRFirstStep.PROP_P, p); job.getConfiguration().set(PROP_BT_PATH, inputBtGlob.toString()); + /* + * PCA-related options, MAHOUT-817 + */ + if (xiPath != null) { + job.getConfiguration().set(PROP_XI_PATH, xiPath.toString()); + job.getConfiguration().set(PROP_SB_PATH, sbPath.toString()); + job.getConfiguration().set(PROP_SQ_PATH, sqPath.toString()); + } + job.setNumReduceTasks(numReduceTasks); // broadcast Bt files if required. Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1292532&r1=1292531&r2=1292532&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java Wed Feb 22 21:57:27 2012 @@ -51,6 +51,8 @@ import org.apache.mahout.common.iterator import org.apache.mahout.math.DenseVector; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.function.PlusMult; import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRLastStep; /** @@ -80,6 +82,9 @@ public final class BtJob { public static final String OUTPUT_Q = "Q"; public static final String OUTPUT_BT = "part"; public static final String OUTPUT_BBT = "bbt"; + public static final String OUTPUT_SQ = "sq"; + public static final String OUTPUT_SB = "sb"; + public static final String PROP_QJOB_PATH = "ssvd.QJob.path"; public static final String PROP_OUPTUT_BBT_PRODUCTS = "ssvd.BtJob.outputBBtProducts"; @@ -87,6 +92,8 @@ public final class BtJob { "ssvd.outerProdBlockHeight"; public static final String PROP_RHAT_BROADCAST = "ssvd.rhat.broadcast"; + public static final String PROP_XI_PATH = "ssvdpca.xi.path"; + static final double SPARSE_ZEROS_PCT_THRESHOLD = 0.1; private BtJob() { @@ -105,16 +112,9 @@ public final class BtJob { private SparseRowBlockAccumulator btCollector; private Context mapContext; - @Override - protected void cleanup(Context context) throws IOException, - InterruptedException { - IOUtils.close(closeables); - } - - @SuppressWarnings("unchecked") - private void outputQRow(Writable key, Writable value) throws IOException { - outputs.getCollector(OUTPUT_Q, null).collect(key, value); - } + // pca stuff + private Vector sqAccum; + private boolean computeSq; /** * We maintain A and QtHat inputs partitioned the same way, so we @@ -136,6 +136,14 @@ public final class BtJob { // make sure Qs are inheriting A row labels. outputQRow(key, qRowValue); + // MAHOUT-817 + if (computeSq) { + if (sqAccum == null) { + sqAccum = new DenseVector(kp); + } + sqAccum.assign(qRow, Functions.PLUS); + } + if (btRow == null) { btRow = new DenseVector(kp); } @@ -166,7 +174,9 @@ public final class BtJob { InterruptedException { super.setup(context); - Path qJobPath = new Path(context.getConfiguration().get(PROP_QJOB_PATH)); + Configuration conf = context.getConfiguration(); + + Path qJobPath = new Path(conf.get(PROP_QJOB_PATH)); /* * actually this is kind of dangerous because this routine thinks we need @@ -182,37 +192,35 @@ public final class BtJob { SequenceFileValueIterator qhatInput = new SequenceFileValueIterator(qInputPath, true, - context.getConfiguration()); + conf); closeables.addFirst(qhatInput); /* * read all r files _in order of task ids_, i.e. partitions (aka group * nums). - * + * * Note: if broadcast option is used, this comes from distributed cache * files rather than hdfs path. */ SequenceFileDirValueIterator rhatInput; - boolean distributedRHat = - context.getConfiguration().get(PROP_RHAT_BROADCAST) != null; + boolean distributedRHat = conf.get(PROP_RHAT_BROADCAST) != null; if (distributedRHat) { - Path[] rFiles = - DistributedCache.getLocalCacheFiles(context.getConfiguration()); + Path[] rFiles = DistributedCache.getLocalCacheFiles(conf); Validate.notNull(rFiles, "no RHat files in distributed cache job definition"); - Configuration conf = new Configuration(); - conf.set("fs.default.name", "file:///"); + Configuration lconf = new Configuration(); + lconf.set("fs.default.name", "file:///"); rhatInput = new SequenceFileDirValueIterator(rFiles, - SSVDSolver.PARTITION_COMPARATOR, + SSVDHelper.PARTITION_COMPARATOR, true, - conf); + lconf); } else { Path rPath = new Path(qJobPath, QJob.OUTPUT_RHAT + "-*"); @@ -220,15 +228,15 @@ public final class BtJob { new SequenceFileDirValueIterator(rPath, PathType.GLOB, null, - SSVDSolver.PARTITION_COMPARATOR, + SSVDHelper.PARTITION_COMPARATOR, true, - context.getConfiguration()); + conf); } Validate.isTrue(rhatInput.hasNext(), "Empty R-hat input!"); closeables.addFirst(rhatInput); - outputs = new MultipleOutputs(new JobConf(context.getConfiguration())); + outputs = new MultipleOutputs(new JobConf(conf)); closeables.addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs)); qr = new QRLastStep(qhatInput, rhatInput, blockNum); @@ -258,12 +266,36 @@ public final class BtJob { }; btCollector = - new SparseRowBlockAccumulator(context.getConfiguration() - .getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, - -1), - btBlockCollector); + new SparseRowBlockAccumulator(conf.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, + -1), btBlockCollector); closeables.addFirst(btCollector); + // MAHOUT-817 + computeSq = (conf.get(PROP_XI_PATH) != null); + + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + try { + if (sqAccum != null) { + /* + * hack: we will output sq partial sums with index -1 for summation. + */ + SparseRowBlockWritable sbrw = new SparseRowBlockWritable(1); + sbrw.plusRow(0, sqAccum); + LongWritable lw = new LongWritable(-1); + context.write(lw, sbrw); + } + } finally { + IOUtils.close(closeables); + } + } + + @SuppressWarnings("unchecked") + private void outputQRow(Writable key, Writable value) throws IOException { + outputs.getCollector(OUTPUT_Q, null).collect(key, value); } } @@ -316,28 +348,40 @@ public final class BtJob { private final IntWritable btKey = new IntWritable(); private final VectorWritable btValue = new VectorWritable(); + // MAHOUT-817 + private Vector xi; + private final PlusMult pmult = new PlusMult(0); + private Vector sbAccum; + @Override protected void setup(Context context) throws IOException, InterruptedException { - blockHeight = - context.getConfiguration().getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1); + Configuration conf = context.getConfiguration(); + blockHeight = conf.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1); - outputBBt = - context.getConfiguration().getBoolean(PROP_OUPTUT_BBT_PRODUCTS, false); + outputBBt = conf.getBoolean(PROP_OUPTUT_BBT_PRODUCTS, false); if (outputBBt) { - int k = context.getConfiguration().getInt(QJob.PROP_K, -1); - int p = context.getConfiguration().getInt(QJob.PROP_P, -1); + int k = conf.getInt(QJob.PROP_K, -1); + int p = conf.getInt(QJob.PROP_P, -1); Validate.isTrue(k > 0, "invalid k parameter"); Validate.isTrue(p >= 0, "invalid p parameter"); mBBt = new UpperTriangular(k + p); - outputs = new MultipleOutputs(new JobConf(context.getConfiguration())); - closeables.addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs)); + } + String xiPathStr = conf.get(PROP_XI_PATH); + if (xiPathStr != null) { + xi = SSVDHelper.loadAndSumUpVectors(new Path(xiPathStr), conf); } + + if (outputBBt || xi != null) { + outputs = new MultipleOutputs(new JobConf(conf)); + closeables.addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs)); + } + } @Override @@ -351,6 +395,19 @@ public final class BtJob { accum.plusBlock(bw); } + // MAHOUT-817: + if (key.get() == -1L) { + + Vector sq = accum.getRows()[0]; + + @SuppressWarnings("unchecked") + OutputCollector sqOut = + outputs.getCollector(OUTPUT_SQ, null); + + sqOut.collect(new IntWritable(0), new VectorWritable(sq)); + return; + } + /* * at this point, sum of rows should be in accum, so we just generate * outer self product of it and add to BBt accumulator. @@ -378,6 +435,18 @@ public final class BtJob { } } + // MAHOUT-817 + if (xi != null) { + // code defensively against shortened xi + int btIndex = btKey.get(); + double xii = xi.size() > btIndex ? xi.getQuick(btIndex) : 0.0; + // compute s_b + pmult.setMultiplicator(xii); + if (sbAccum == null) + sbAccum = new DenseVector(btRow.size()); + sbAccum.assign(btRow, pmult); + } + } } @@ -396,17 +465,27 @@ public final class BtJob { collector.collect(new IntWritable(), new VectorWritable(new DenseVector(mBBt.getData()))); } + + // MAHOUT-817 + if (sbAccum != null) { + @SuppressWarnings("unchecked") + OutputCollector collector = + outputs.getCollector(OUTPUT_SB, null); + + collector.collect(new IntWritable(), new VectorWritable(sbAccum)); + + } } finally { IOUtils.close(closeables); } } - } public static void run(Configuration conf, Path[] inputPathA, Path inputPathQJob, + Path xiPath, Path outputPath, int minSplitSize, int k, @@ -433,6 +512,19 @@ public final class BtJob { IntWritable.class, VectorWritable.class); } + if (xiPath != null) { + // compute pca -related stuff as well + MultipleOutputs.addNamedOutput(oldApiJob, + OUTPUT_SQ, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + IntWritable.class, + VectorWritable.class); + MultipleOutputs.addNamedOutput(oldApiJob, + OUTPUT_SB, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + IntWritable.class, + VectorWritable.class); + } /* * HACK: we use old api multiple outputs since they are not available in the @@ -479,6 +571,13 @@ public final class BtJob { job.setNumReduceTasks(numReduceTasks); /* + * PCA-related options, MAHOUT-817 + */ + if (xiPath != null) { + job.getConfiguration().set(PROP_XI_PATH, xiPath.toString()); + } + + /* * we can broadhast Rhat files since all of them are reuqired by each job, * but not Q files which correspond to splits of A (so each split of A will * require only particular Q file, each time different one). Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java?rev=1292532&r1=1292531&r2=1292532&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java Wed Feb 22 21:57:27 2012 @@ -17,9 +17,19 @@ package org.apache.mahout.math.hadoop.stochasticsvd; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.mahout.math.DenseVector; import org.apache.mahout.math.Vector; import org.apache.mahout.math.Vector.Element; @@ -33,14 +43,14 @@ public class Omega { private final long seed; private final int kp; - public Omega(long seed, int k, int p) { + public Omega(long seed, int kp) { this.seed = seed; - kp = k + p; + this.kp = kp; } /** * Get omega element at (x,y) uniformly distributed within [-1...1) - * + * * @param row * omega row * @param column @@ -98,6 +108,68 @@ public class Omega { } } + /* + * computes t(Omega) %*% v in multithreaded fashion + */ + public Vector mutlithreadedTRightMultiply(final Vector v) { + + int nThreads = Runtime.getRuntime().availableProcessors(); + ExecutorService es = + new ThreadPoolExecutor(nThreads, + nThreads, + 1, + TimeUnit.SECONDS, + new ArrayBlockingQueue(kp)); + + try { + + List> dotFutures = new ArrayList>(kp); + + for (int i = 0; i < kp; i++) { + final int index = i; + + Future dotFuture = es.submit(new Callable() { + @Override + public Double call() throws Exception { + double result = 0.0; + if (v.isDense()) { + for (int k = 0; k < v.size(); k++) + // it's ok, this is reentrant + result += getQuick(k, index) * v.getQuick(k); + + } else { + for (Iterator iter = v.iterateNonZero(); iter.hasNext();) { + Vector.Element el = iter.next(); + int k = el.index(); + result += getQuick(k, index) * el.get(); + } + } + return result; + } + }); + dotFutures.add(dotFuture); + } + + try { + Vector res = new DenseVector(kp); + for (int i = 0; i < kp; i++) { + res.setQuick(i, dotFutures.get(i).get()); + } + return res; + } catch (InterruptedException exc) { + throw new RuntimeException("Interrupted", exc); + } catch (ExecutionException exc) { + if (exc.getCause() instanceof RuntimeException) + throw (RuntimeException) exc.getCause(); + else + throw new RuntimeException(exc.getCause()); + } + + } finally { + es.shutdown(); + } + } + protected void accumDots(int aIndex, double aElement, double[] yRow) { for (int i = 0; i < kp; i++) { yRow[i] += getQuick(aIndex, i) * aElement; Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java?rev=1292532&r1=1292531&r2=1292532&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java Wed Feb 22 21:57:27 2012 @@ -40,6 +40,7 @@ import org.apache.mahout.common.IOUtils; import org.apache.mahout.math.DenseVector; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRFirstStep; /** @@ -60,7 +61,9 @@ public final class QJob { public static final String PROP_OMEGA_SEED = "ssvd.omegaseed"; public static final String PROP_K = QRFirstStep.PROP_K; public static final String PROP_P = QRFirstStep.PROP_P; - public static final String PROP_AROWBLOCK_SIZE = QRFirstStep.PROP_AROWBLOCK_SIZE; + public static final String PROP_SB_PATH = "ssvdpca.sb.path"; + public static final String PROP_AROWBLOCK_SIZE = + QRFirstStep.PROP_AROWBLOCK_SIZE; public static final String OUTPUT_RHAT = "R"; public static final String OUTPUT_QHAT = "QHat"; @@ -77,23 +80,29 @@ public final class QJob { private SplitPartitionedWritable qHatKey; private SplitPartitionedWritable rHatKey; private Vector yRow; + private Vector sb; private Omega omega; private int kp; - private QRFirstStep qr; @Override protected void setup(Context context) throws IOException, InterruptedException { - - int k = Integer.parseInt(context.getConfiguration().get(PROP_K)); - int p = Integer.parseInt(context.getConfiguration().get(PROP_P)); + + Configuration conf = context.getConfiguration(); + int k = Integer.parseInt(conf.get(PROP_K)); + int p = Integer.parseInt(conf.get(PROP_P)); kp = k + p; - long omegaSeed = Long.parseLong(context.getConfiguration().get(PROP_OMEGA_SEED)); - omega = new Omega(omegaSeed, k, p); + long omegaSeed = Long.parseLong(conf.get(PROP_OMEGA_SEED)); + omega = new Omega(omegaSeed, k + p); + + String sbPathStr = conf.get(PROP_SB_PATH); + if (sbPathStr != null) { + sb = SSVDHelper.loadAndSumUpVectors(new Path(sbPathStr), conf); + } - outputs = new MultipleOutputs(new JobConf(context.getConfiguration())); + outputs = new MultipleOutputs(new JobConf(conf)); closeables.addFirst(new Closeable() { @Override public void close() throws IOException { @@ -103,6 +112,7 @@ public final class QJob { qHatKey = new SplitPartitionedWritable(context); rHatKey = new SplitPartitionedWritable(context); + OutputCollector qhatCollector = new OutputCollector() { @@ -114,6 +124,7 @@ public final class QJob { qHatKey.incrementItemOrdinal(); } }; + OutputCollector rhatCollector = new OutputCollector() { @@ -126,31 +137,31 @@ public final class QJob { } }; - qr = - new QRFirstStep(context.getConfiguration(), - qhatCollector, - rhatCollector); + qr = new QRFirstStep(conf, qhatCollector, rhatCollector); closeables.addFirst(qr);// important: qr closes first!! - yRow=new DenseVector(kp); + yRow = new DenseVector(kp); } - + @Override protected void map(Writable key, VectorWritable value, Context context) throws IOException, InterruptedException { omega.computeYRow(value.get(), yRow); + if (sb != null) { + yRow.assign(sb, Functions.MINUS); + } qr.collect(key, yRow); } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { + protected void cleanup(Context context) throws IOException, + InterruptedException { IOUtils.close(closeables); } } public static void run(Configuration conf, Path[] inputPaths, + Path sbPath, Path outputPath, int aBlockRows, int minSplitSize, @@ -161,18 +172,16 @@ public final class QJob { InterruptedException, IOException { JobConf oldApiJob = new JobConf(conf); - MultipleOutputs - .addNamedOutput(oldApiJob, - OUTPUT_QHAT, - org.apache.hadoop.mapred.SequenceFileOutputFormat.class, - SplitPartitionedWritable.class, - DenseBlockWritable.class); - MultipleOutputs - .addNamedOutput(oldApiJob, - OUTPUT_RHAT, - org.apache.hadoop.mapred.SequenceFileOutputFormat.class, - SplitPartitionedWritable.class, - VectorWritable.class); + MultipleOutputs.addNamedOutput(oldApiJob, + OUTPUT_QHAT, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + SplitPartitionedWritable.class, + DenseBlockWritable.class); + MultipleOutputs.addNamedOutput(oldApiJob, + OUTPUT_RHAT, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + SplitPartitionedWritable.class, + VectorWritable.class); Job job = new Job(oldApiJob); job.setJobName("Q-job"); @@ -203,6 +212,9 @@ public final class QJob { job.getConfiguration().setLong(PROP_OMEGA_SEED, seed); job.getConfiguration().setInt(PROP_K, k); job.getConfiguration().setInt(PROP_P, p); + if (sbPath != null) { + job.getConfiguration().set(PROP_SB_PATH, sbPath.toString()); + } /* * number of reduce tasks doesn't matter. we don't actually send anything to Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java?rev=1292532&r1=1292531&r2=1292532&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java Wed Feb 22 21:57:27 2012 @@ -17,23 +17,18 @@ package org.apache.mahout.math.hadoop.stochasticsvd; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; -import com.google.common.io.Closeables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.common.AbstractJob; import org.apache.mahout.common.commandline.DefaultOptionCreator; -import org.apache.mahout.math.DenseVector; -import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.hadoop.MatrixColumnMeansJob; /** * Mahout CLI adapter for SSVDSolver @@ -81,6 +76,13 @@ public class SSVDCli extends AbstractJob "br", "whether use distributed cache to broadcast matrices wherever possible", String.valueOf(true)); + addOption("pca", + "pca", + "run in pca mode: compute column-wise mean and subtract from input", + String.valueOf(false)); + addOption("pcaOffset", + "xi", + "path(glob) of external pca mean (optional, dont compute, use external mean"); addOption(DefaultOptionCreator.overwriteOption().create()); Map> pargs = parseArguments(args); @@ -101,6 +103,10 @@ public class SSVDCli extends AbstractJob boolean cVHalfSigma = Boolean.parseBoolean(getOption("vHalfSigma")); int reduceTasks = Integer.parseInt(getOption("reduceTasks")); boolean broadcast = Boolean.parseBoolean(getOption("broadcast")); + String xiPathStr = getOption("pcaOffset"); + Path xiPath = xiPathStr == null ? null : new Path(xiPathStr); + boolean pca = Boolean.parseBoolean(getOption("pca")) || xiPath != null; + boolean overwrite = pargs.containsKey(keyFor(DefaultOptionCreator.OVERWRITE_OPTION)); @@ -109,14 +115,17 @@ public class SSVDCli extends AbstractJob throw new IOException("No Hadoop configuration present"); } + Path[] inputPaths = new Path[] { getInputPath() }; + + // MAHOUT-817 + if (pca && xiPath == null) { + xiPath = new Path(getTempPath(), "xi"); + MatrixColumnMeansJob.run(conf, inputPaths[0], getTempPath()); + } + SSVDSolver solver = - new SSVDSolver(conf, - new Path[] { getInputPath() }, - getTempPath(), - r, - k, - p, - reduceTasks); + new SSVDSolver(conf, inputPaths, getTempPath(), r, k, p, reduceTasks); + solver.setMinSplitSize(minSplitSize); solver.setComputeU(computeU); solver.setComputeV(computeV); @@ -127,6 +136,7 @@ public class SSVDCli extends AbstractJob solver.setQ(q); solver.setBroadcast(broadcast); solver.setOverwrite(overwrite); + solver.setPcaMeanPath(xiPath); solver.run(); @@ -135,24 +145,8 @@ public class SSVDCli extends AbstractJob fs.mkdirs(getOutputPath()); - SequenceFile.Writer sigmaW = null; - - try { - sigmaW = - SequenceFile.createWriter(fs, - conf, - getOutputPath("sigma"), - NullWritable.class, - VectorWritable.class); - Writable sValues = - new VectorWritable(new DenseVector(Arrays.copyOf(solver.getSingularValues(), - k), - true)); - sigmaW.append(NullWritable.get(), sValues); - - } finally { - Closeables.closeQuietly(sigmaW); - } + Vector svalues = solver.getSingularValues().viewPart(0, k); + SSVDHelper.saveVector(svalues, getOutputPath("sigma"), conf); if (computeU) { FileStatus[] uFiles = fs.globStatus(new Path(solver.getUPath())); @@ -169,7 +163,6 @@ public class SSVDCli extends AbstractJob fs.rename(vf.getPath(), getOutputPath()); } } - } return 0; } Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java?rev=1292532&view=auto ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java (added) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java Wed Feb 22 21:57:27 2012 @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; + +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; + +/** + * set of small file manipulation helpers. + * + */ + +public class SSVDHelper { + + /** + * load single vector from an hdfs file (possibly presented as glob). + */ + static Vector loadVector(Path glob, Configuration conf) throws IOException { + + SequenceFileDirValueIterator iter = + new SequenceFileDirValueIterator(glob, + PathType.GLOB, + null, + null, + true, + conf); + + try { + if (!iter.hasNext()) + throw new IOException("Empty input while reading vector"); + VectorWritable vw = iter.next(); + + if (iter.hasNext()) + throw new IOException("Unexpected data after the end of vector file"); + + return vw.get(); + + } finally { + Closeables.closeQuietly(iter); + } + } + + /** + * save single vector into hdfs file. + * + * @param v + * vector to save + * @param vectorFilePath + * @param conf + * @throws IOException + */ + public static void saveVector(Vector v, + Path vectorFilePath, + Configuration conf) throws IOException { + VectorWritable vw = new VectorWritable(v); + FileSystem fs = FileSystem.get(conf); + SequenceFile.Writer w = + new SequenceFile.Writer(fs, + conf, + vectorFilePath, + IntWritable.class, + VectorWritable.class); + try { + w.append(new IntWritable(), vw); + } finally { + /* + * this is a writer, no quiet close please. we must bail out on incomplete + * close. + */ + w.close(); + } + } + + /** + * sniff label type in the input files + */ + static Class sniffInputLabelType(Path[] inputPath, + Configuration conf) + throws IOException { + FileSystem fs = FileSystem.get(conf); + for (Path p : inputPath) { + FileStatus[] fstats = fs.globStatus(p); + if (fstats == null || fstats.length == 0) { + continue; + } + + FileStatus firstSeqFile; + if (!fstats[0].isDir()) { + firstSeqFile = fstats[0]; + } else { + firstSeqFile = + fs.listStatus(fstats[0].getPath(), PathFilters.logsCRCFilter())[0]; + } + + SequenceFile.Reader r = null; + try { + r = new SequenceFile.Reader(fs, firstSeqFile.getPath(), conf); + return r.getKeyClass().asSubclass(Writable.class); + } finally { + Closeables.closeQuietly(r); + } + } + throw new IOException("Unable to open input files to determine input label type."); + } + + private static final Pattern OUTPUT_FILE_PATTERN = + Pattern.compile("(\\w+)-(m|r)-(\\d+)(\\.\\w+)?"); + + static final Comparator PARTITION_COMPARATOR = + new Comparator() { + private final Matcher matcher = OUTPUT_FILE_PATTERN.matcher(""); + + @Override + public int compare(FileStatus o1, FileStatus o2) { + matcher.reset(o1.getPath().getName()); + if (!matcher.matches()) { + throw new IllegalArgumentException("Unexpected file name, unable to deduce partition #:" + + o1.getPath()); + } + int p1 = Integer.parseInt(matcher.group(3)); + matcher.reset(o2.getPath().getName()); + if (!matcher.matches()) { + throw new IllegalArgumentException("Unexpected file name, unable to deduce partition #:" + + o2.getPath()); + } + + int p2 = Integer.parseInt(matcher.group(3)); + return p1 - p2; + } + + }; + + /** + * helper capabiltiy to load distributed row matrices into dense matrix (to + * support tests mainly). + * + * @param fs + * filesystem + * @param glob + * FS glob + * @param conf + * configuration + * @return Dense matrix array + * @throws IOException + * when I/O occurs. + */ + public static double[][] loadDistributedRowMatrix(FileSystem fs, + Path glob, + Configuration conf) + throws IOException { + + FileStatus[] files = fs.globStatus(glob); + if (files == null) { + return null; + } + + List denseData = Lists.newArrayList(); + + /* + * assume it is partitioned output, so we need to read them up in order of + * partitions. + */ + Arrays.sort(files, PARTITION_COMPARATOR); + + for (FileStatus fstat : files) { + for (VectorWritable value : new SequenceFileValueIterable(fstat.getPath(), + true, + conf)) { + Vector v = value.get(); + int size = v.size(); + double[] row = new double[size]; + for (int i = 0; i < size; i++) { + row[i] = v.get(i); + } + // ignore row label. + denseData.add(row); + } + } + + return denseData.toArray(new double[denseData.size()][]); + } + + /** + * Load multiplel upper triangular matrices and sum them up. + * + * @param fs + * @param glob + * @param conf + * @return the sum of upper triangular inputs. + * @throws IOException + */ + public static UpperTriangular + loadAndSumUpperTriangularMatrices(Path glob, Configuration conf) + throws IOException { + Vector v = loadAndSumUpVectors(glob, conf); + return v == null ? null : new UpperTriangular(v); + } + + /** + * returns sum of all vectors in different files specified by glob + * + * @param glob + * @param conf + * @return + * @throws IOException + */ + public static Vector loadAndSumUpVectors(Path glob, Configuration conf) + throws IOException { + + SequenceFileDirValueIterator iter = + new SequenceFileDirValueIterator(glob, + PathType.GLOB, + null, + PARTITION_COMPARATOR, + true, + conf); + + try { + Vector v = null; + while (iter.hasNext()) { + if (v == null) + v = new DenseVector(iter.next().get()); + else + v.assign(iter.next().get(), Functions.PLUS); + } + return v; + + } finally { + Closeables.closeQuietly(iter); + } + + } + + /** + * Load only one upper triangular matrix and issue error if mroe than one is + * found. + * + * @param fs + * @param glob + * @param conf + * @return + * @throws IOException + */ + public static UpperTriangular loadUpperTriangularMatrix(FileSystem fs, + Path glob, + Configuration conf) + throws IOException { + + /* + * there still may be more than one file in glob and only one of them must + * contain the matrix. + */ + + SequenceFileDirValueIterator iter = + new SequenceFileDirValueIterator(glob, + PathType.GLOB, + null, + null, + true, + conf); + try { + if (!iter.hasNext()) + throw new IOException("No triangular matrices found"); + Vector v = iter.next().get(); + UpperTriangular result = new UpperTriangular(v); + if (iter.hasNext()) + throw new IOException("Unexpected overrun in upper triangular matrix files"); + return result; + + } finally { + iter.close(); + } + } + + /** + * extracts row-wise raw data from a Mahout matrix for 3rd party solvers. + * Unfortunately values member is 100% encapsulated in {@link DenseMatrix} at + * this point, so we have to resort to abstract element-wise copying. + * + * @param m + * @return + */ + public static double[][] extractRawData(Matrix m) { + int rows = m.numRows(); + int cols = m.numCols(); + double[][] result = new double[rows][]; + for (int i = 0; i < rows; i++) { + result[i] = new double[cols]; + for (int j = 0; j < cols; j++) { + result[i][j] = m.getQuick(i, j); + } + } + return result; + } + +} Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java?rev=1292532&r1=1292531&r2=1292532&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java Wed Feb 22 21:57:27 2012 @@ -19,33 +19,24 @@ package org.apache.mahout.math.hadoop.st import java.io.Closeable; import java.io.IOException; -import java.util.Arrays; -import java.util.Comparator; import java.util.Deque; -import java.util.List; import java.util.Random; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Writable; import org.apache.mahout.common.IOUtils; import org.apache.mahout.common.RandomUtils; -import org.apache.mahout.common.iterator.sequencefile.PathFilters; -import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable; +import org.apache.mahout.math.DenseMatrix; import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.DistributedRowMatrixWriter; +import org.apache.mahout.math.Matrix; import org.apache.mahout.math.Vector; -import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; import org.apache.mahout.math.ssvd.EigenSolverWrapper; import com.google.common.collect.Lists; -import com.google.common.io.Closeables; /** * Stochastic SVD solver (API class). @@ -96,7 +87,7 @@ import com.google.common.io.Closeables; */ public class SSVDSolver { - private double[] svalues; + private Vector svalues; private boolean computeU = true; private boolean computeV = true; private String uPath; @@ -118,6 +109,7 @@ public class SSVDSolver { private boolean cVHalfSigma; private boolean overwrite; private boolean broadcast = true; + private Path pcaMeanPath; /** * create new SSVD solver. Required parameters are passed to constructor to @@ -216,7 +208,7 @@ public class SSVDSolver { * * @return singlular values (largest to smallest) */ - public double[] getSingularValues() { + public Vector getSingularValues() { return svalues; } @@ -300,6 +292,30 @@ public class SSVDSolver { } /** + * Optional. Single-vector file path for a vector (aka xi in MAHOUT-817 + * working notes) to be subtracted from each row of input. + *

+ * + * Brute force approach would force would turn input into a dense input, which + * is often not very desirable. By supplying this offset to SSVD solver, we + * can avoid most of that overhead due to increased input density. + *

+ * + * The vector size for this offest is n (width of A input). In PCA and R this + * is known as "column means", but in this case it can be any offset of row + * vectors of course to propagate into SSVD solution. + *

+ * + */ + public Path getPcaMeanPath() { + return pcaMeanPath; + } + + public void setPcaMeanPath(Path pcaMeanPath) { + this.pcaMeanPath = pcaMeanPath; + } + + /** * run all SSVD jobs. * * @throws IOException @@ -310,8 +326,8 @@ public class SSVDSolver { Deque closeables = Lists.newLinkedList(); try { Class labelType = - sniffInputLabelType(inputPath, conf); - FileSystem fs = FileSystem.get(outputPath.toUri(), conf); + SSVDHelper.sniffInputLabelType(inputPath, conf); + FileSystem fs = FileSystem.get(conf); Path qPath = new Path(outputPath, "Q-job"); Path btPath = new Path(outputPath, "Bt-job"); @@ -320,15 +336,47 @@ public class SSVDSolver { Path uPath = new Path(outputPath, "U"); Path vPath = new Path(outputPath, "V"); + Path pcaBasePath = new Path(outputPath, "pca"); + Path sbPath = null; + Path sqPath = null; + + double xisquaredlen = 0; + + if (pcaMeanPath != null) + fs.mkdirs(pcaBasePath); + Random rnd = RandomUtils.getRandom(); + long seed = rnd.nextLong(); + + if (pcaMeanPath != null) { + /* + * combute s_b0 if pca offset present. + * + * Just in case, we treat xi path as a possible reduce or otherwise + * multiple task output that we assume we need to sum up partial + * components. If it is just one file, it will work too. + */ + + Vector xi = SSVDHelper.loadAndSumUpVectors(pcaMeanPath, conf); + xisquaredlen = xi.dot(xi); + Omega omega = new Omega(seed, k + p); + Vector s_b0 = omega.mutlithreadedTRightMultiply(xi); + + SSVDHelper.saveVector(s_b0, sbPath = + new Path(pcaBasePath, "somega.seq"), conf); + } + if (overwrite) { fs.delete(outputPath, true); } - Random rnd = RandomUtils.getRandom(); - long seed = rnd.nextLong(); + /* + * if we work with pca offset, we need to precompute s_bq0 aka s_omega for + * jobs to use. + */ QJob.run(conf, inputPath, + sbPath, qPath, ablockRows, minSplitSize, @@ -344,9 +392,13 @@ public class SSVDSolver { * bit too many (I would be happy i that were ever the case though). */ + sbPath = new Path(pcaBasePath, "sb0"); + sqPath = new Path(pcaBasePath, "sq0"); + BtJob.run(conf, inputPath, qPath, + pcaMeanPath, btPath, minSplitSize, k, @@ -357,6 +409,9 @@ public class SSVDSolver { labelType, q <= 0); + sbPath = new Path(btPath, BtJob.OUTPUT_SB + "-*"); + sqPath = new Path(btPath, BtJob.OUTPUT_SQ + "-*"); + // power iterations for (int i = 0; i < q; i++) { @@ -365,6 +420,9 @@ public class SSVDSolver { ABtDenseOutJob.run(conf, inputPath, btPathGlob, + pcaMeanPath, + sqPath, + sbPath, qPath, ablockRows, minSplitSize, @@ -379,6 +437,7 @@ public class SSVDSolver { BtJob.run(conf, inputPath, qPath, + pcaMeanPath, btPath, minSplitSize, k, @@ -388,103 +447,92 @@ public class SSVDSolver { broadcast, labelType, i == q - 1); + sbPath = new Path(btPath, BtJob.OUTPUT_SB + "-*"); + sqPath = new Path(btPath, BtJob.OUTPUT_SQ + "-*"); } - UpperTriangular bbt = - loadAndSumUpperTriangularMatrices(fs, new Path(btPath, BtJob.OUTPUT_BBT - + "-*"), conf); + UpperTriangular bbtTriangular = + SSVDHelper.loadAndSumUpperTriangularMatrices(new Path(btPath, + BtJob.OUTPUT_BBT + + "-*"), conf); // convert bbt to something our eigensolver could understand - assert bbt.columnSize() == k + p; + assert bbtTriangular.columnSize() == k + p; - double[][] bbtSquare = new double[k + p][]; - for (int i = 0; i < k + p; i++) { - bbtSquare[i] = new double[k + p]; - } + /* + * we currently use a 3rd party in-core eigensolver. So we need just a + * dense array representation for it. + */ + DenseMatrix bbtSquare = new DenseMatrix(k+p,k+p); for (int i = 0; i < k + p; i++) { for (int j = i; j < k + p; j++) { - bbtSquare[i][j] = bbtSquare[j][i] = bbt.getQuick(i, j); + double val = bbtTriangular.getQuick(i, j); + bbtSquare.setQuick(i, j, val); + bbtSquare.setQuick(j, i, val); } } + + // MAHOUT-817 + if (pcaMeanPath != null) { + Vector sq = SSVDHelper.loadAndSumUpVectors(sqPath, conf); + Vector sb = SSVDHelper.loadAndSumUpVectors(sbPath, conf); + Matrix mC = sq.cross(sb); - svalues = new double[k + p]; + bbtSquare.assign(mC, Functions.MINUS); + bbtSquare.assign(mC.transpose(), Functions.MINUS); + mC = null; - // try something else. - EigenSolverWrapper eigenWrapper = new EigenSolverWrapper(bbtSquare); + Matrix outerSq = sq.cross(sq); + outerSq.assign(Functions.mult(xisquaredlen)); + bbtSquare.assign(outerSq, Functions.PLUS); - double[] eigenva2 = eigenWrapper.getEigenValues(); - for (int i = 0; i < k + p; i++) { - svalues[i] = Math.sqrt(eigenva2[i]); // sqrt? - } - - // save/redistribute UHat - double[][] uHat = eigenWrapper.getUHat(); - - fs.mkdirs(uHatPath); - SequenceFile.Writer uHatWriter = - SequenceFile.createWriter(fs, - conf, - uHatPath = new Path(uHatPath, "uhat.seq"), - IntWritable.class, - VectorWritable.class, - CompressionType.BLOCK); - closeables.addFirst(uHatWriter); - - int m = uHat.length; - IntWritable iw = new IntWritable(); - VectorWritable vw = new VectorWritable(); - for (int i = 0; i < m; i++) { - vw.set(new DenseVector(uHat[i], true)); - iw.set(i); - uHatWriter.append(iw, vw); } - closeables.remove(uHatWriter); - Closeables.closeQuietly(uHatWriter); + EigenSolverWrapper eigenWrapper = new EigenSolverWrapper(SSVDHelper.extractRawData(bbtSquare)); + Matrix uHat = new DenseMatrix(eigenWrapper.getUHat()); + svalues = new DenseVector(eigenWrapper.getEigenValues()); - SequenceFile.Writer svWriter = - SequenceFile.createWriter(fs, - conf, - svPath = new Path(svPath, "svalues.seq"), - IntWritable.class, - VectorWritable.class, - CompressionType.BLOCK); + svalues.assign(Functions.SQRT); - closeables.addFirst(svWriter); - - vw.set(new DenseVector(svalues, true)); - svWriter.append(iw, vw); + // save/redistribute UHat + fs.mkdirs(uHatPath); + DistributedRowMatrixWriter.write(uHatPath = + new Path(uHatPath, "uhat.seq"), conf, uHat); - closeables.remove(svWriter); - Closeables.closeQuietly(svWriter); + // save sigma. + SSVDHelper.saveVector(svalues, + svPath = new Path(svPath, "svalues.seq"), + conf); UJob ujob = null; if (computeU) { ujob = new UJob(); - ujob.start(conf, - new Path(btPath, BtJob.OUTPUT_Q + "-*"), - uHatPath, - svPath, - uPath, - k, - reduceTasks, - labelType, - cUHalfSigma); + ujob.run(conf, + new Path(btPath, BtJob.OUTPUT_Q + "-*"), + uHatPath, + svPath, + uPath, + k, + reduceTasks, + labelType, + cUHalfSigma); // actually this is map-only job anyway } VJob vjob = null; if (computeV) { vjob = new VJob(); - vjob.start(conf, - new Path(btPath, BtJob.OUTPUT_BT + "-*"), - uHatPath, - svPath, - vPath, - k, - reduceTasks, - cVHalfSigma); + vjob.run(conf, + new Path(btPath, BtJob.OUTPUT_BT + "-*"), + pcaMeanPath, + sqPath, + uHatPath, + svPath, + vPath, + k, + reduceTasks, + cVHalfSigma); } if (ujob != null) { @@ -506,195 +554,4 @@ public class SSVDSolver { } } - - private static Class - sniffInputLabelType(Path[] inputPath, Configuration conf) - throws IOException { - FileSystem fs = FileSystem.get(inputPath[0].toUri(), conf); - for (Path p : inputPath) { - FileStatus[] fstats = fs.globStatus(p); - if (fstats == null || fstats.length == 0) { - continue; - } - - FileStatus firstSeqFile; - if (fstats[0].isDir()) { - firstSeqFile = fs.listStatus(fstats[0].getPath(), PathFilters.logsCRCFilter())[0]; - } else { - firstSeqFile = fstats[0]; - } - - SequenceFile.Reader r = null; - try { - r = new SequenceFile.Reader(fs, firstSeqFile.getPath(), conf); - return r.getKeyClass().asSubclass(Writable.class); - } finally { - Closeables.closeQuietly(r); - } - } - throw new IOException("Unable to open input files to determine input label type."); - } - - private static final Pattern OUTPUT_FILE_PATTERN = - Pattern.compile("(\\w+)-(m|r)-(\\d+)(\\.\\w+)?"); - - static final Comparator PARTITION_COMPARATOR = - new Comparator() { - private final Matcher matcher = OUTPUT_FILE_PATTERN.matcher(""); - - @Override - public int compare(FileStatus o1, FileStatus o2) { - matcher.reset(o1.getPath().getName()); - if (!matcher.matches()) { - throw new IllegalArgumentException("Unexpected file name, unable to deduce partition #:" - + o1.getPath()); - } - int p1 = Integer.parseInt(matcher.group(3)); - matcher.reset(o2.getPath().getName()); - if (!matcher.matches()) { - throw new IllegalArgumentException("Unexpected file name, unable to deduce partition #:" - + o2.getPath()); - } - - int p2 = Integer.parseInt(matcher.group(3)); - return p1 - p2; - } - - }; - - /** - * helper capabiltiy to load distributed row matrices into dense matrix (to - * support tests mainly). - * - * @param fs - * filesystem - * @param glob - * FS glob - * @param conf - * configuration - * @return Dense matrix array - * @throws IOException - * when I/O occurs. - */ - public static double[][] loadDistributedRowMatrix(FileSystem fs, - Path glob, - Configuration conf) - throws IOException { - - FileStatus[] files = fs.globStatus(glob); - if (files == null) { - return null; - } - - List denseData = Lists.newArrayList(); - - /* - * assume it is partitioned output, so we need to read them up in order of - * partitions. - */ - Arrays.sort(files, PARTITION_COMPARATOR); - - for (FileStatus fstat : files) { - for (VectorWritable value : new SequenceFileValueIterable(fstat.getPath(), - true, - conf)) { - Vector v = value.get(); - int size = v.size(); - double[] row = new double[size]; - for (int i = 0; i < size; i++) { - row[i] = v.get(i); - } - // ignore row label. - denseData.add(row); - } - } - - return denseData.toArray(new double[denseData.size()][]); - } - - /** - * Load multiplel upper triangular matrices and sum them up. - * - * @param fs - * @param glob - * @param conf - * @return the sum of upper triangular inputs. - * @throws IOException - */ - public static UpperTriangular - loadAndSumUpperTriangularMatrices(FileSystem fs, - Path glob, - Configuration conf) throws IOException { - - FileStatus[] files = fs.globStatus(glob); - if (files == null) { - return null; - } - - /* - * assume it is partitioned output, so we need to read them up in order of - * partitions. - */ - Arrays.sort(files, PARTITION_COMPARATOR); - - DenseVector result = null; - for (FileStatus fstat : files) { - for (VectorWritable value : new SequenceFileValueIterable(fstat.getPath(), - true, - conf)) { - Vector v = value.get(); - if (result == null) { - result = new DenseVector(v); - } else { - result.addAll(v); - } - } - } - - if (result == null) { - throw new IOException("Unexpected underrun in upper triangular matrix files"); - } - return new UpperTriangular(result); - } - - /** - * Load only one upper triangular matrix and issue error if mroe than one is - * found. - */ - public static UpperTriangular loadUpperTriangularMatrix(FileSystem fs, - Path glob, - Configuration conf) - throws IOException { - - FileStatus[] files = fs.globStatus(glob); - if (files == null) { - return null; - } - - /* - * assume it is partitioned output, so we need to read them up in order of - * partitions. - */ - Arrays.sort(files, PARTITION_COMPARATOR); - - UpperTriangular result = null; - for (FileStatus fstat : files) { - for (VectorWritable value : new SequenceFileValueIterable(fstat.getPath(), - true, - conf)) { - Vector v = value.get(); - if (result == null) { - result = new UpperTriangular(v); - } else { - throw new IOException("Unexpected overrun in upper triangular matrix files"); - } - } - } - - if (result == null) { - throw new IOException("Unexpected underrun in upper triangular matrix files"); - } - return result; - } - } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java?rev=1292532&r1=1292531&r2=1292532&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java Wed Feb 22 21:57:27 2012 @@ -37,6 +37,7 @@ import org.apache.mahout.math.DenseVecto import org.apache.mahout.math.Matrix; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; /** * Computes U=Q*Uhat of SSVD (optionally adding x pow(Sigma, 0.5) ) @@ -51,7 +52,7 @@ public class UJob { private Job job; - public void start(Configuration conf, Path inputPathQ, Path inputUHatPath, + public void run(Configuration conf, Path inputPathQ, Path inputUHatPath, Path sigmaPath, Path outputPath, int k, int numReduceTasks, Class labelClass, boolean uHalfSigma) throws ClassNotFoundException, InterruptedException, IOException { @@ -135,7 +136,7 @@ public class UJob { Path sigmaPath = new Path(context.getConfiguration().get(PROP_SIGMA_PATH)); FileSystem fs = FileSystem.get(uHatPath.toUri(), context.getConfiguration()); - uHat = new DenseMatrix(SSVDSolver.loadDistributedRowMatrix(fs, + uHat = new DenseMatrix(SSVDHelper.loadDistributedRowMatrix(fs, uHatPath, context.getConfiguration())); // since uHat is (k+p) x (k+p) kp = uHat.columnSize(); @@ -144,11 +145,8 @@ public class UJob { uRowWritable = new VectorWritable(uRow); if (context.getConfiguration().get(PROP_U_HALFSIGMA) != null) { - sValues = new DenseVector(SSVDSolver.loadDistributedRowMatrix(fs, - sigmaPath, context.getConfiguration())[0], true); - for (int i = 0; i < k; i++) { - sValues.setQuick(i, Math.sqrt(sValues.getQuick(i))); - } + sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration()); + sValues.assign(Functions.SQRT); } }