Return-Path: X-Original-To: apmail-mahout-commits-archive@www.apache.org Delivered-To: apmail-mahout-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B447510661 for ; Sun, 9 Jun 2013 12:18:09 +0000 (UTC) Received: (qmail 64064 invoked by uid 500); 9 Jun 2013 12:18:09 -0000 Delivered-To: apmail-mahout-commits-archive@mahout.apache.org Received: (qmail 64023 invoked by uid 500); 9 Jun 2013 12:18:09 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 64012 invoked by uid 99); 9 Jun 2013 12:18:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 09 Jun 2013 12:18:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 09 Jun 2013 12:18:05 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3CF7423889FA; Sun, 9 Jun 2013 12:17:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1491191 - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ core/src/main/java/org/apache/mahout/vectorizer/ core/src/main/java/org/apache/mahout/vectorizer/term/ core/src/main/java/org/apache/mahout/... Date: Sun, 09 Jun 2013 12:17:45 -0000 To: commits@mahout.apache.org From: gsingers@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130609121746.3CF7423889FA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gsingers Date: Sun Jun 9 12:17:45 2013 New Revision: 1491191 URL: http://svn.apache.org/r1491191 Log: MAHOUT-1103: properly partition the data for MapReduce Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java mahout/trunk/examples/bin/cluster-reuters.sh Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java?rev=1491191&r1=1491190&r2=1491191&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java Sun Jun 9 12:17:45 2013 @@ -17,18 +17,21 @@ package org.apache.mahout.clustering.topdown.postprocessor; -import java.io.IOException; -import java.util.Iterator; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; +import org.apache.mahout.clustering.iterator.ClusterWritable; import org.apache.mahout.common.iterator.sequencefile.PathFilters; import org.apache.mahout.common.iterator.sequencefile.PathType; import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + /** * Reads the number of clusters produced by the clustering algorithm. */ @@ -39,11 +42,9 @@ public final class ClusterCountReader { /** * Reads the number of clusters present by reading the clusters-*-final file. - * - * @param clusterOutputPath - * The output path provided to the clustering algorithm. - * @param conf - * The hadoop configuration. + * + * @param clusterOutputPath The output path provided to the clustering algorithm. + * @param conf The hadoop configuration. * @return the number of final clusters. */ public static int getNumberOfClusters(Path clusterOutputPath, Configuration conf) throws IOException { @@ -51,11 +52,11 @@ public final class ClusterCountReader { FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter()); int numberOfClusters = 0; Iterator it = new SequenceFileDirValueIterator(clusterFiles[0].getPath(), - PathType.LIST, - PathFilters.partFilter(), - null, - true, - conf); + PathType.LIST, + PathFilters.partFilter(), + null, + true, + conf); while (it.hasNext()) { it.next(); numberOfClusters++; @@ -63,4 +64,38 @@ public final class ClusterCountReader { return numberOfClusters; } + /** + * Generates a list of all cluster ids by reading the clusters-*-final file. + * + * @param clusterOutputPath The output path provided to the clustering algorithm. + * @param conf The hadoop configuration. + * @return An ArrayList containing the final cluster ids. + */ + public static Map getClusterIDs(Path clusterOutputPath, Configuration conf, boolean keyIsClusterId) throws IOException { + Map clusterIds = new HashMap(); + FileSystem fileSystem = clusterOutputPath.getFileSystem(conf); + FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter()); + //System.out.println("LOOK HERE: " + clusterOutputPath); + Iterator it = new SequenceFileDirValueIterator(clusterFiles[0].getPath(), + PathType.LIST, + PathFilters.partFilter(), + null, + true, + conf); + int i = 0; + while (it.hasNext()) { + Integer key, value; + if (keyIsClusterId == true) { // key is the cluster id, value is i, the index we will use + key = it.next().getValue().getId(); + value = i; + } else { + key = i; + value = it.next().getValue().getId(); + } + clusterIds.put(key, value); + i++; + } + return clusterIds; + } + } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java?rev=1491191&r1=1491190&r2=1491191&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java Sun Jun 9 12:17:45 2013 @@ -17,10 +17,6 @@ package org.apache.mahout.clustering.topdown.postprocessor; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,77 +33,80 @@ import org.apache.mahout.common.iterator import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; import org.apache.mahout.math.VectorWritable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + /** * This class reads the output of any clustering algorithm, and, creates separate directories for different * clusters. Each cluster directory's name is its clusterId. Each and every point is written in the cluster * directory associated with that point. - * + *

* This class incorporates a sequential algorithm and is appropriate for use for data which has been clustered * sequentially. - * + *

* The sequential and non sequential version, both are being used from {@link ClusterOutputPostProcessorDriver}. */ public final class ClusterOutputPostProcessor { - + private Path clusteredPoints; private final FileSystem fileSystem; private final Configuration conf; private final Path clusterPostProcessorOutput; - private final Map postProcessedClusterDirectories = new HashMap(); + private final Map postProcessedClusterDirectories = new HashMap(); private long uniqueVectorId = 0L; - private final Map writersForClusters; - + private final Map writersForClusters; + public ClusterOutputPostProcessor(Path clusterOutputToBeProcessed, Path output, Configuration hadoopConfiguration) throws IOException { this.clusterPostProcessorOutput = output; this.clusteredPoints = PathDirectory.getClusterOutputClusteredPoints(clusterOutputToBeProcessed); this.conf = hadoopConfiguration; - this.writersForClusters = new HashMap(); - fileSystem = clusteredPoints.getFileSystem(conf); + this.writersForClusters = new HashMap(); + fileSystem = clusteredPoints.getFileSystem(conf); } - + /** * This method takes the clustered points output by the clustering algorithms as input and writes them into * their respective clusters. */ public void process() throws IOException { createPostProcessDirectory(); - for (Pair record - : new SequenceFileDirIterable(clusteredPoints, - PathType.GLOB, - PathFilters.partFilter(), - null, - false, - conf)) { + for (Pair record + : new SequenceFileDirIterable(clusteredPoints, + PathType.GLOB, + PathFilters.partFilter(), + null, + false, + conf)) { String clusterId = record.getFirst().toString().trim(); putVectorInRespectiveCluster(clusterId, record.getSecond()); } IOUtils.close(writersForClusters.values()); writersForClusters.clear(); } - + /** * Creates the directory to put post processed clusters. */ private void createPostProcessDirectory() throws IOException { if (!fileSystem.exists(clusterPostProcessorOutput) - && !fileSystem.mkdirs(clusterPostProcessorOutput)) { + && !fileSystem.mkdirs(clusterPostProcessorOutput)) { throw new IOException("Error creating cluster post processor directory"); } } - + /** - * * Finds out the cluster directory of the vector and writes it into the specified cluster. */ private void putVectorInRespectiveCluster(String clusterId, WeightedVectorWritable point) throws IOException { Writer writer = findWriterForVector(clusterId); postProcessedClusterDirectories.put(clusterId, - PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId)); + PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId)); writeVectorToCluster(writer, point); } - + /** * Finds out the path in cluster where the point is supposed to be written. */ @@ -121,7 +120,7 @@ public final class ClusterOutputPostProc } return writer; } - + /** * Writes vector to the cluster directory. */ @@ -129,16 +128,16 @@ public final class ClusterOutputPostProc writer.append(new LongWritable(uniqueVectorId++), new VectorWritable(point.getVector())); writer.sync(); } - + /** * @return the set of all post processed cluster paths. */ - public Map getPostProcessedClusterDirectories() { + public Map getPostProcessedClusterDirectories() { return postProcessedClusterDirectories; } - + public void setClusteredPoints(Path clusteredPoints) { this.clusteredPoints = clusteredPoints; } - + } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java?rev=1491191&r1=1491190&r2=1491191&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java Sun Jun 9 12:17:45 2013 @@ -17,13 +17,11 @@ package org.apache.mahout.clustering.topdown.postprocessor; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -32,71 +30,74 @@ import org.apache.hadoop.mapreduce.lib.o import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.HadoopUtil; import org.apache.mahout.common.commandline.DefaultOptionCreator; import org.apache.mahout.common.iterator.sequencefile.PathFilters; import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator; import org.apache.mahout.math.VectorWritable; +import java.io.IOException; + /** * Post processes the output of clustering algorithms and groups them into respective clusters. Ideal to be * used for top down clustering. It can also be used if the clustering output needs to be grouped into their * respective clusters. */ public final class ClusterOutputPostProcessorDriver extends AbstractJob { - + /** * CLI to run clustering post processor. The input to post processor is the ouput path specified to the * clustering. */ @Override public int run(String[] args) throws Exception { - addInputOption(); addOutputOption(); addOption(DefaultOptionCreator.methodOption().create()); + addOption(DefaultOptionCreator.overwriteOption().create()); if (parseArguments(args) == null) { return -1; } - Path input = getInputPath(); Path output = getOutputPath(); if (getConf() == null) { setConf(new Configuration()); } + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + HadoopUtil.delete(getConf(), output); + } boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase( - DefaultOptionCreator.SEQUENTIAL_METHOD); + DefaultOptionCreator.SEQUENTIAL_METHOD); run(input, output, runSequential); return 0; - + } - + /** * Constructor to be used by the ToolRunner. */ - private ClusterOutputPostProcessorDriver() {} - + private ClusterOutputPostProcessorDriver() { + } + public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new ClusterOutputPostProcessorDriver(), args); } - + /** * Post processes the output of clustering algorithms and groups them into respective clusters. Each * cluster's vectors are written into a directory named after its clusterId. - * - * @param input - * The output path provided to the clustering algorithm, whose would be post processed. Hint : The - * path of the directory containing clusters-*-final and clusteredPoints. - * @param output - * The post processed data would be stored at this path. - * @param runSequential - * If set to true, post processes it sequentially, else, uses. MapReduce. Hint : If the clustering - * was done sequentially, make it sequential, else vice versa. + * + * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint : The + * path of the directory containing clusters-*-final and clusteredPoints. + * @param output The post processed data would be stored at this path. + * @param runSequential If set to true, post processes it sequentially, else, uses. MapReduce. Hint : If the clustering + * was done sequentially, make it sequential, else vice versa. */ public static void run(Path input, Path output, boolean runSequential) throws IOException, - InterruptedException, - ClassNotFoundException { + InterruptedException, + ClassNotFoundException { if (runSequential) { postProcessSeq(input, output); } else { @@ -104,81 +105,76 @@ public final class ClusterOutputPostProc postProcessMR(conf, input, output); movePartFilesToRespectiveDirectories(conf, output); } - + } - + /** * Process Sequentially. Reads the vectors one by one, and puts them into respective directory, named after * their clusterId. - * - * @param input - * The output path provided to the clustering algorithm, whose would be post processed. Hint : The - * path of the directory containing clusters-*-final and clusteredPoints. - * @param output - * The post processed data would be stored at this path. + * + * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint : The + * path of the directory containing clusters-*-final and clusteredPoints. + * @param output The post processed data would be stored at this path. */ private static void postProcessSeq(Path input, Path output) throws IOException { ClusterOutputPostProcessor clusterOutputPostProcessor = new ClusterOutputPostProcessor(input, output, - new Configuration()); + new Configuration()); clusterOutputPostProcessor.process(); } - + /** * Process as a map reduce job. The numberOfReduceTasks is set to the number of clusters present in the * output. So that each cluster's vector is written in its own part file. - * - * @param conf - * The hadoop configuration. - * @param input - * The output path provided to the clustering algorithm, whose would be post processed. Hint : The - * path of the directory containing clusters-*-final and clusteredPoints. - * @param output - * The post processed data would be stored at this path. + * + * @param conf The hadoop configuration. + * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint : The + * path of the directory containing clusters-*-final and clusteredPoints. + * @param output The post processed data would be stored at this path. */ private static void postProcessMR(Configuration conf, Path input, Path output) throws IOException, - InterruptedException, - ClassNotFoundException { + InterruptedException, + ClassNotFoundException { + System.out.println("WARNING: If you are running in Hadoop local mode, please use the --sequential option, as the MapReduce option will not work properly"); + int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf); + conf.set("clusterOutputPath", input.toString()); Job job = new Job(conf, "ClusterOutputPostProcessor Driver running over input: " + input); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(ClusterOutputPostProcessorMapper.class); - job.setMapOutputKeyClass(Text.class); + job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(VectorWritable.class); job.setReducerClass(ClusterOutputPostProcessorReducer.class); - job.setOutputKeyClass(Text.class); + job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(VectorWritable.class); - int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf); job.setNumReduceTasks(numberOfClusters); job.setJarByClass(ClusterOutputPostProcessorDriver.class); - + FileInputFormat.addInputPath(job, new Path(input, new Path("clusteredPoints"))); FileOutputFormat.setOutputPath(job, output); if (!job.waitForCompletion(true)) { throw new InterruptedException("ClusterOutputPostProcessor Job failed processing " + input); } } - + /** * The mapreduce version of the post processor writes different clusters into different part files. This * method reads the part files and moves them into directories named after their clusterIds. - * - * @param conf - * The hadoop configuration. - * @param output - * The post processed data would be stored at this path. + * + * @param conf The hadoop configuration. + * @param output The post processed data would be stored at this path. */ private static void movePartFilesToRespectiveDirectories(Configuration conf, Path output) throws IOException { FileSystem fileSystem = output.getFileSystem(conf); for (FileStatus fileStatus : fileSystem.listStatus(output, PathFilters.partFilter())) { - SequenceFileIterator it = - new SequenceFileIterator(fileStatus.getPath(), true, conf); + SequenceFileIterator it = + new SequenceFileIterator(fileStatus.getPath(), true, conf); if (it.hasNext()) { renameFile(it.next().getFirst(), fileStatus, conf); } it.close(); } } - + /** * Using @FileSystem rename method to move the file. */ Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java?rev=1491191&r1=1491190&r2=1491191&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java Sun Jun 9 12:17:45 2013 @@ -17,26 +17,41 @@ package org.apache.mahout.clustering.topdown.postprocessor; -import java.io.IOException; - +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.clustering.classify.WeightedVectorWritable; import org.apache.mahout.math.VectorWritable; +import java.io.IOException; +import java.util.Map; + /** * Mapper for post processing cluster output. */ public class ClusterOutputPostProcessorMapper extends - Mapper { - - /** - * The key is the cluster id and the value is the vector. - */ + Mapper { + + private Map newClusterMappings; + private VectorWritable outputVector; + + //read the current cluster ids, and populate the cluster mapping hash table + @Override + public void setup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + //this give the clusters-x-final directory where the cluster ids can be read + Path clusterOutputPath = new Path(conf.get("clusterOutputPath")); + //we want the key to be the cluster id, the value to be the index + newClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, true); + outputVector = new VectorWritable(); + } + @Override - protected void map(IntWritable key, WeightedVectorWritable vector, Context context) throws IOException, - InterruptedException { - context.write(new Text(key.toString().trim()), new VectorWritable(vector.getVector())); + public void map(IntWritable key, WeightedVectorWritable val, Context context) throws IOException, InterruptedException { + //by pivoting on the cluster mapping value, we can make sure that each unique cluster goes to it's own reducer, since they + //are numbered from 0 to k-1, where k is the number of clusters + outputVector.set(val.getVector()); + context.write(new IntWritable(newClusterMappings.get(key.get())), outputVector); } } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java?rev=1491191&r1=1491190&r2=1491191&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java Sun Jun 9 12:17:45 2013 @@ -17,25 +17,46 @@ package org.apache.mahout.clustering.topdown.postprocessor; -import java.io.IOException; - -import org.apache.hadoop.io.Text; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.math.VectorWritable; +import java.io.IOException; +import java.util.Map; + /** * Reducer for post processing cluster output. */ -public class ClusterOutputPostProcessorReducer extends Reducer { +public class ClusterOutputPostProcessorReducer extends Reducer { + + + private Map reverseClusterMappings; + + //read the current cluster ids, and populate the hash cluster mapping hash table + @Override + public void setup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + Path clusterOutputPath = new Path(conf.get("clusterOutputPath")); + //we want to the key to be the index, the value to be the cluster id + reverseClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, false); + } + /** - * The key is the cluster id and the values contains the points in that cluster. + * The key is the remapped cluster id and the values contains the vectors in that cluster. */ @Override - protected void reduce(Text key, Iterable values, Context context) throws IOException, - InterruptedException { + protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, + InterruptedException { + //remap the cluster back to its original id + //and then output the vectors with their correct + //cluster id. + IntWritable outKey = new IntWritable(reverseClusterMappings.get(key.get())); + System.out.println(outKey + " this: " + this); for (VectorWritable value : values) { - context.write(key, value); + context.write(outKey, value); } } - + } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java?rev=1491191&r1=1491190&r2=1491191&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java Sun Jun 9 12:17:45 2013 @@ -52,6 +52,8 @@ import org.apache.mahout.vectorizer.term import org.apache.mahout.vectorizer.term.TermCountCombiner; import org.apache.mahout.vectorizer.term.TermCountMapper; import org.apache.mahout.vectorizer.term.TermCountReducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class converts a set of input documents in the sequence file format to vectors. The Sequence file @@ -60,6 +62,7 @@ import org.apache.mahout.vectorizer.term * This is a dictionary based Vectorizer. */ public final class DictionaryVectorizer implements Vectorizer { + private static Logger log = LoggerFactory.getLogger(DictionaryVectorizer.class); public static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "tf-vectors"; public static final String MIN_SUPPORT = "min.support"; @@ -167,6 +170,7 @@ public final class DictionaryVectorizer int[] maxTermDimension = new int[1]; List dictionaryChunks; + log.info("Creating dictionary from {} and saving at {}", input, dictionaryJobPath); if (maxNGramSize == 1) { startWordCounting(input, dictionaryJobPath, baseConf, minSupport); dictionaryChunks = Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java?rev=1491191&r1=1491190&r2=1491191&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java Sun Jun 9 12:17:45 2013 @@ -17,8 +17,6 @@ package org.apache.mahout.vectorizer; -import java.util.List; - import org.apache.commons.cli2.CommandLine; import org.apache.commons.cli2.Group; import org.apache.commons.cli2.Option; @@ -45,121 +43,123 @@ import org.apache.mahout.vectorizer.tfid import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + /** * Converts a given set of sequence files into SparseVectors */ public final class SparseVectorsFromSequenceFiles extends AbstractJob { - + private static final Logger log = LoggerFactory.getLogger(SparseVectorsFromSequenceFiles.class); - + public static void main(String[] args) throws Exception { ToolRunner.run(new SparseVectorsFromSequenceFiles(), args); } - + @Override public int run(String[] args) throws Exception { DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); ArgumentBuilder abuilder = new ArgumentBuilder(); GroupBuilder gbuilder = new GroupBuilder(); - + Option inputDirOpt = DefaultOptionCreator.inputOption().create(); - + Option outputDirOpt = DefaultOptionCreator.outputOption().create(); - + Option minSupportOpt = obuilder.withLongName("minSupport").withArgument( - abuilder.withName("minSupport").withMinimum(1).withMaximum(1).create()).withDescription( - "(Optional) Minimum Support. Default Value: 2").withShortName("s").create(); - + abuilder.withName("minSupport").withMinimum(1).withMaximum(1).create()).withDescription( + "(Optional) Minimum Support. Default Value: 2").withShortName("s").create(); + Option analyzerNameOpt = obuilder.withLongName("analyzerName").withArgument( - abuilder.withName("analyzerName").withMinimum(1).withMaximum(1).create()).withDescription( - "The class name of the analyzer").withShortName("a").create(); - + abuilder.withName("analyzerName").withMinimum(1).withMaximum(1).create()).withDescription( + "The class name of the analyzer").withShortName("a").create(); + Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument( - abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription( - "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk").create(); - + abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription( + "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk").create(); + Option weightOpt = obuilder.withLongName("weight").withRequired(false).withArgument( - abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription( - "The kind of weight to use. Currently TF or TFIDF").withShortName("wt").create(); - + abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription( + "The kind of weight to use. Currently TF or TFIDF").withShortName("wt").create(); + Option minDFOpt = obuilder.withLongName("minDF").withRequired(false).withArgument( - abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription( - "The minimum document frequency. Default is 1").withShortName("md").create(); + abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription( + "The minimum document frequency. Default is 1").withShortName("md").create(); Option maxDFPercentOpt = obuilder.withLongName("maxDFPercent").withRequired(false).withArgument( - abuilder.withName("maxDFPercent").withMinimum(1).withMaximum(1).create()).withDescription( - "The max percentage of docs for the DF. Can be used to remove really high frequency terms." - + " Expressed as an integer between 0 and 100. Default is 99. If maxDFSigma is also set, " - + "it will override this value.").withShortName("x").create(); + abuilder.withName("maxDFPercent").withMinimum(1).withMaximum(1).create()).withDescription( + "The max percentage of docs for the DF. Can be used to remove really high frequency terms." + + " Expressed as an integer between 0 and 100. Default is 99. If maxDFSigma is also set, " + + "it will override this value.").withShortName("x").create(); Option maxDFSigmaOpt = obuilder.withLongName("maxDFSigma").withRequired(false).withArgument( - abuilder.withName("maxDFSigma").withMinimum(1).withMaximum(1).create()).withDescription( - "What portion of the tf (tf-idf) vectors to be used, expressed in times the standard deviation (sigma) " - + "of the document frequencies of these vectors. Can be used to remove really high frequency terms." - + " Expressed as a double value. Good value to be specified is 3.0. In case the value is less than 0 " - + "no vectors will be filtered out. Default is -1.0. Overrides maxDFPercent").withShortName("xs").create(); - + abuilder.withName("maxDFSigma").withMinimum(1).withMaximum(1).create()).withDescription( + "What portion of the tf (tf-idf) vectors to be used, expressed in times the standard deviation (sigma) " + + "of the document frequencies of these vectors. Can be used to remove really high frequency terms." + + " Expressed as a double value. Good value to be specified is 3.0. In case the value is less than 0 " + + "no vectors will be filtered out. Default is -1.0. Overrides maxDFPercent").withShortName("xs").create(); + Option minLLROpt = obuilder.withLongName("minLLR").withRequired(false).withArgument( - abuilder.withName("minLLR").withMinimum(1).withMaximum(1).create()).withDescription( - "(Optional)The minimum Log Likelihood Ratio(Float) Default is " + LLRReducer.DEFAULT_MIN_LLR) - .withShortName("ml").create(); - + abuilder.withName("minLLR").withMinimum(1).withMaximum(1).create()).withDescription( + "(Optional)The minimum Log Likelihood Ratio(Float) Default is " + LLRReducer.DEFAULT_MIN_LLR) + .withShortName("ml").create(); + Option numReduceTasksOpt = obuilder.withLongName("numReducers").withArgument( - abuilder.withName("numReducers").withMinimum(1).withMaximum(1).create()).withDescription( - "(Optional) Number of reduce tasks. Default Value: 1").withShortName("nr").create(); - + abuilder.withName("numReducers").withMinimum(1).withMaximum(1).create()).withDescription( + "(Optional) Number of reduce tasks. Default Value: 1").withShortName("nr").create(); + Option powerOpt = obuilder.withLongName("norm").withRequired(false).withArgument( - abuilder.withName("norm").withMinimum(1).withMaximum(1).create()).withDescription( - "The norm to use, expressed as either a float or \"INF\" if you want to use the Infinite norm. " - + "Must be greater or equal to 0. The default is not to normalize").withShortName("n").create(); - + abuilder.withName("norm").withMinimum(1).withMaximum(1).create()).withDescription( + "The norm to use, expressed as either a float or \"INF\" if you want to use the Infinite norm. " + + "Must be greater or equal to 0. The default is not to normalize").withShortName("n").create(); + Option logNormalizeOpt = obuilder.withLongName("logNormalize").withRequired(false) - .withDescription( - "(Optional) Whether output vectors should be logNormalize. If set true else false") - .withShortName("lnorm").create(); - + .withDescription( + "(Optional) Whether output vectors should be logNormalize. If set true else false") + .withShortName("lnorm").create(); + Option maxNGramSizeOpt = obuilder.withLongName("maxNGramSize").withRequired(false).withArgument( - abuilder.withName("ngramSize").withMinimum(1).withMaximum(1).create()) - .withDescription( - "(Optional) The maximum size of ngrams to create" - + " (2 = bigrams, 3 = trigrams, etc) Default Value:1").withShortName("ng").create(); - + abuilder.withName("ngramSize").withMinimum(1).withMaximum(1).create()) + .withDescription( + "(Optional) The maximum size of ngrams to create" + + " (2 = bigrams, 3 = trigrams, etc) Default Value:1").withShortName("ng").create(); + Option sequentialAccessVectorOpt = obuilder.withLongName("sequentialAccessVector").withRequired(false) - .withDescription( - "(Optional) Whether output vectors should be SequentialAccessVectors. If set true else false") - .withShortName("seq").create(); - + .withDescription( + "(Optional) Whether output vectors should be SequentialAccessVectors. If set true else false") + .withShortName("seq").create(); + Option namedVectorOpt = obuilder.withLongName("namedVector").withRequired(false) - .withDescription( - "(Optional) Whether output vectors should be NamedVectors. If set true else false") - .withShortName("nv").create(); - + .withDescription( + "(Optional) Whether output vectors should be NamedVectors. If set true else false") + .withShortName("nv").create(); + Option overwriteOutput = obuilder.withLongName("overwrite").withRequired(false).withDescription( - "If set, overwrite the output directory").withShortName("ow").create(); + "If set, overwrite the output directory").withShortName("ow").create(); Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h") - .create(); - + .create(); + Group group = gbuilder.withName("Options").withOption(minSupportOpt).withOption(analyzerNameOpt) - .withOption(chunkSizeOpt).withOption(outputDirOpt).withOption(inputDirOpt).withOption(minDFOpt) - .withOption(maxDFSigmaOpt).withOption(maxDFPercentOpt).withOption(weightOpt).withOption(powerOpt) - .withOption(minLLROpt).withOption(numReduceTasksOpt).withOption(maxNGramSizeOpt).withOption(overwriteOutput) - .withOption(helpOpt).withOption(sequentialAccessVectorOpt).withOption(namedVectorOpt) - .withOption(logNormalizeOpt) - .create(); + .withOption(chunkSizeOpt).withOption(outputDirOpt).withOption(inputDirOpt).withOption(minDFOpt) + .withOption(maxDFSigmaOpt).withOption(maxDFPercentOpt).withOption(weightOpt).withOption(powerOpt) + .withOption(minLLROpt).withOption(numReduceTasksOpt).withOption(maxNGramSizeOpt).withOption(overwriteOutput) + .withOption(helpOpt).withOption(sequentialAccessVectorOpt).withOption(namedVectorOpt) + .withOption(logNormalizeOpt) + .create(); try { Parser parser = new Parser(); parser.setGroup(group); parser.setHelpOption(helpOpt); CommandLine cmdLine = parser.parse(args); - + if (cmdLine.hasOption(helpOpt)) { CommandLineUtil.printHelp(group); return -1; } - + Path inputDir = new Path((String) cmdLine.getValue(inputDirOpt)); Path outputDir = new Path((String) cmdLine.getValue(outputDirOpt)); - + int chunkSize = 100; if (cmdLine.hasOption(chunkSizeOpt)) { chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt)); @@ -169,9 +169,9 @@ public final class SparseVectorsFromSequ String minSupportString = (String) cmdLine.getValue(minSupportOpt); minSupport = Integer.parseInt(minSupportString); } - + int maxNGramSize = 1; - + if (cmdLine.hasOption(maxNGramSizeOpt)) { try { maxNGramSize = Integer.parseInt(cmdLine.getValue(maxNGramSizeOpt).toString()); @@ -180,17 +180,17 @@ public final class SparseVectorsFromSequ } } log.info("Maximum n-gram size is: {}", maxNGramSize); - + if (cmdLine.hasOption(overwriteOutput)) { HadoopUtil.delete(getConf(), outputDir); } - + float minLLRValue = LLRReducer.DEFAULT_MIN_LLR; if (cmdLine.hasOption(minLLROpt)) { minLLRValue = Float.parseFloat(cmdLine.getValue(minLLROpt).toString()); } log.info("Minimum LLR value: {}", minLLRValue); - + int reduceTasks = 1; if (cmdLine.hasOption(numReduceTasksOpt)) { reduceTasks = Integer.parseInt(cmdLine.getValue(numReduceTasksOpt).toString()); @@ -205,9 +205,9 @@ public final class SparseVectorsFromSequ // you can't instantiate it AnalyzerUtils.createAnalyzer(analyzerClass); } - + boolean processIdf; - + if (cmdLine.hasOption(weightOpt)) { String wString = cmdLine.getValue(weightOpt).toString(); if ("tf".equalsIgnoreCase(wString)) { @@ -220,7 +220,7 @@ public final class SparseVectorsFromSequ } else { processIdf = true; } - + int minDf = 1; if (cmdLine.hasOption(minDFOpt)) { minDf = Integer.parseInt(cmdLine.getValue(minDFOpt).toString()); @@ -233,7 +233,7 @@ public final class SparseVectorsFromSequ if (cmdLine.hasOption(maxDFSigmaOpt)) { maxDFSigma = Double.parseDouble(cmdLine.getValue(maxDFSigmaOpt).toString()); } - + float norm = PartialVectorMerger.NO_NORMALIZING; if (cmdLine.hasOption(powerOpt)) { String power = cmdLine.getValue(powerOpt).toString(); @@ -243,12 +243,12 @@ public final class SparseVectorsFromSequ norm = Float.parseFloat(power); } } - + boolean logNormalize = false; if (cmdLine.hasOption(logNormalizeOpt)) { logNormalize = true; } - + log.info("Tokenizing documents in {}", inputDir); Configuration conf = getConf(); Path tokenizedPath = new Path(outputDir, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER); //TODO: move this into DictionaryVectorizer , and then fold SparseVectorsFrom with EncodedVectorsFrom @@ -264,98 +264,99 @@ public final class SparseVectorsFromSequ if (cmdLine.hasOption(namedVectorOpt)) { namedVectors = true; } - boolean shouldPrune = maxDFSigma >= 0.0 || maxDFPercent > 0.00; + boolean shouldPrune = maxDFSigma >= 0.0 || maxDFPercent > 0.00; String tfDirName = shouldPrune - ? DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-toprune" - : DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER; - + ? DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-toprune" + : DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER; + log.info("Creating Term Frequency Vectors"); if (processIdf) { DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath, - outputDir, - tfDirName, - conf, - minSupport, - maxNGramSize, - minLLRValue, - -1.0f, - false, - reduceTasks, - chunkSize, - sequentialAccessOutput, - namedVectors); + outputDir, + tfDirName, + conf, + minSupport, + maxNGramSize, + minLLRValue, + -1.0f, + false, + reduceTasks, + chunkSize, + sequentialAccessOutput, + namedVectors); } else { DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath, - outputDir, - tfDirName, - conf, - minSupport, - maxNGramSize, - minLLRValue, - norm, - logNormalize, - reduceTasks, - chunkSize, - sequentialAccessOutput, - namedVectors); + outputDir, + tfDirName, + conf, + minSupport, + maxNGramSize, + minLLRValue, + norm, + logNormalize, + reduceTasks, + chunkSize, + sequentialAccessOutput, + namedVectors); } Pair> docFrequenciesFeatures = null; // Should document frequency features be processed if (shouldPrune || processIdf) { + log.info("Calculating IDF"); docFrequenciesFeatures = - TFIDFConverter.calculateDF(new Path(outputDir, tfDirName),outputDir, conf, chunkSize); + TFIDFConverter.calculateDF(new Path(outputDir, tfDirName), outputDir, conf, chunkSize); } long maxDF = maxDFPercent; //if we are pruning by std dev, then this will get changed if (shouldPrune) { - long vectorCount = docFrequenciesFeatures.getFirst()[1]; - if (maxDFSigma >= 0.0) { - Path dfDir = new Path(outputDir, TFIDFConverter.WORDCOUNT_OUTPUT_FOLDER); - Path stdCalcDir = new Path(outputDir, HighDFWordsPruner.STD_CALC_DIR); - - // Calculate the standard deviation - double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf); - maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount); - } + long vectorCount = docFrequenciesFeatures.getFirst()[1]; + if (maxDFSigma >= 0.0) { + Path dfDir = new Path(outputDir, TFIDFConverter.WORDCOUNT_OUTPUT_FOLDER); + Path stdCalcDir = new Path(outputDir, HighDFWordsPruner.STD_CALC_DIR); + + // Calculate the standard deviation + double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf); + maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount); + } - long maxDFThreshold = (long) (vectorCount * (maxDF / 100.0f)); + long maxDFThreshold = (long) (vectorCount * (maxDF / 100.0f)); // Prune the term frequency vectors Path tfDir = new Path(outputDir, tfDirName); Path prunedTFDir = new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER); Path prunedPartialTFDir = - new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-partial"); - + new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-partial"); + log.info("Pruning"); if (processIdf) { HighDFWordsPruner.pruneVectors(tfDir, - prunedTFDir, - prunedPartialTFDir, - maxDFThreshold, - minDf, - conf, - docFrequenciesFeatures, - -1.0f, - false, - reduceTasks); + prunedTFDir, + prunedPartialTFDir, + maxDFThreshold, + minDf, + conf, + docFrequenciesFeatures, + -1.0f, + false, + reduceTasks); } else { HighDFWordsPruner.pruneVectors(tfDir, - prunedTFDir, - prunedPartialTFDir, - maxDFThreshold, - minDf, - conf, - docFrequenciesFeatures, - norm, - logNormalize, - reduceTasks); + prunedTFDir, + prunedPartialTFDir, + maxDFThreshold, + minDf, + conf, + docFrequenciesFeatures, + norm, + logNormalize, + reduceTasks); } HadoopUtil.delete(new Configuration(conf), tfDir); } if (processIdf) { TFIDFConverter.processTfIdf( - new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER), - outputDir, conf, docFrequenciesFeatures, minDf, maxDF, norm, logNormalize, - sequentialAccessOutput, namedVectors, reduceTasks); + new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER), + outputDir, conf, docFrequenciesFeatures, minDf, maxDF, norm, logNormalize, + sequentialAccessOutput, namedVectors, reduceTasks); } } catch (OptionException e) { log.error("Exception", e); @@ -363,5 +364,5 @@ public final class SparseVectorsFromSequ } return 0; } - + } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java?rev=1491191&r1=1491190&r2=1491191&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java Sun Jun 9 12:17:45 2013 @@ -21,6 +21,8 @@ import com.google.common.base.Preconditi import com.google.common.io.Closeables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -40,6 +42,8 @@ import org.apache.mahout.math.VectorWrit import org.apache.mahout.math.map.OpenObjectIntHashMap; import org.apache.mahout.vectorizer.DictionaryVectorizer; import org.apache.mahout.vectorizer.common.PartialVectorMerger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; @@ -49,7 +53,7 @@ import java.util.Iterator; * Converts a document in to a sparse vector */ public class TFPartialVectorReducer extends Reducer { - + private transient static Logger log = LoggerFactory.getLogger(TFPartialVectorReducer.class); private final OpenObjectIntHashMap dictionary = new OpenObjectIntHashMap(); private int dimension; @@ -62,7 +66,7 @@ public class TFPartialVectorReducer exte @Override protected void reduce(Text key, Iterable values, Context context) - throws IOException, InterruptedException { + throws IOException, InterruptedException { Iterator it = values.iterator(); if (!it.hasNext()) { return; @@ -119,7 +123,18 @@ public class TFPartialVectorReducer exte Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); Preconditions.checkArgument(localFiles != null && localFiles.length >= 1, "missing paths from the DistributedCache"); - + LocalFileSystem localFs = FileSystem.getLocal(conf); + if (!localFs.exists(localFiles[0])) { + log.info("Can't find dictionary dist. cache file, looking in .getCacheFiles"); + URI[] filesURIs = DistributedCache.getCacheFiles(conf); + if (filesURIs == null) { + throw new IOException("Cannot read Frequency list from Distributed Cache"); + } + if (filesURIs.length != 1) { + throw new IOException("Cannot read Frequency list from Distributed Cache (" + localFiles.length + ')'); + } + localFiles[0] = new Path(filesURIs[0].getPath()); + } dimension = conf.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE); sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false); namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false); Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java?rev=1491191&r1=1491190&r2=1491191&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java Sun Jun 9 12:17:45 2013 @@ -24,6 +24,8 @@ import java.util.Iterator; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; @@ -103,7 +105,17 @@ public class TFIDFPartialVectorReducer e Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); Preconditions.checkArgument(localFiles != null && localFiles.length >= 1, "missing paths from the DistributedCache"); - + LocalFileSystem localFs = FileSystem.getLocal(conf); + if (!localFs.exists(localFiles[0])) { + URI[] filesURIs = DistributedCache.getCacheFiles(conf); + if (filesURIs == null) { + throw new IOException("Cannot read Frequency list from Distributed Cache"); + } + if (filesURIs.length != 1) { + throw new IOException("Cannot read Frequency list from Distributed Cache (" + localFiles.length + ')'); + } + localFiles[0] = new Path(filesURIs[0].getPath()); + } vectorCount = conf.getLong(TFIDFConverter.VECTOR_COUNT, 1); featureCount = conf.getLong(TFIDFConverter.FEATURE_COUNT, 1); minDf = conf.getInt(TFIDFConverter.MIN_DF, 1); Modified: mahout/trunk/examples/bin/cluster-reuters.sh URL: http://svn.apache.org/viewvc/mahout/trunk/examples/bin/cluster-reuters.sh?rev=1491191&r1=1491190&r2=1491191&view=diff ============================================================================== --- mahout/trunk/examples/bin/cluster-reuters.sh (original) +++ mahout/trunk/examples/bin/cluster-reuters.sh Sun Jun 9 12:17:45 2013 @@ -90,15 +90,19 @@ if [ ! -e ${WORK_DIR}/reuters-out-seqdir tar xzf ${WORK_DIR}/reuters21578.tar.gz -C ${WORK_DIR}/reuters-sgm fi + echo "Extracting Reuters" + $MAHOUT org.apache.lucene.benchmark.utils.ExtractReuters ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-out if [ "$HADOOP_HOME" != "" ] && [ "$MAHOUT_LOCAL" == "" ] ; then + echo "Copying Reuters data to Hadoop" set +e $HADOOP dfs -rmr ${WORK_DIR}/reuters-sgm + $HADOOP dfs -rmr ${WORK_DIR}/reuters-out set -e - $HADOOP dfs -put ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-sgm - fi - $MAHOUT org.apache.lucene.benchmark.utils.ExtractReuters ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-out + $HADOOP dfs -put ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-sgm + $HADOOP dfs -put ${WORK_DIR}/reuters-out ${WORK_DIR}/reuters-out + fi fi - + echo "Converting to Sequence Files from Directory" $MAHOUT seqdirectory -i ${WORK_DIR}/reuters-out -o ${WORK_DIR}/reuters-out-seqdir -c UTF-8 -chunk 5 fi