mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From robina...@apache.org
Subject svn commit: r906869 - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/common/ examples/src/main/java/org/apache/mahout/text/ utils/src/main/java/org/apache/mahout/text/ utils/src/main/java/org/apache/mahout/utils/clustering/ utils/src/mai...
Date Fri, 05 Feb 2010 09:27:05 GMT
Author: robinanil
Date: Fri Feb  5 09:27:04 2010
New Revision: 906869

URL: http://svn.apache.org/viewvc?rev=906869&view=rev
Log:
MAHOUT-237 DictionaryVectorizer in separate classes

Added:
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java
Removed:
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentTokenizerMapper.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorGenerator.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorMerger.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountMapper.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountReducer.java
Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringTuple.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SparseVectorsFromSequenceFiles.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java
    lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=906869&r1=906868&r2=906869&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java Fri Feb  5 09:27:04 2010
@@ -17,6 +17,9 @@
 
 package org.apache.mahout.common;
 
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -25,16 +28,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
-
 public final class HadoopUtil {
-
+  
   private static final Logger log = LoggerFactory.getLogger(HadoopUtil.class);
-
-  private HadoopUtil() {
-  }
-
+  
+  private HadoopUtil() {}
+  
   public static void overwriteOutput(String output) throws IOException {
     Configuration conf = new JobConf(KMeansDriver.class);
     Path outPath = new Path(output);
@@ -43,7 +42,30 @@
       log.warn("Deleting {}", outPath);
       fs.delete(outPath, true);
     }
+    log.warn("Creating dir {}", outPath);
     fs.mkdirs(outPath);
-
+  }
+  
+  public static void deletePath(String output, FileSystem fs) throws IOException {
+    Path outPath = new Path(output);
+    if (fs.exists(outPath)) {
+      log.warn("Deleting {}", outPath);
+      fs.delete(outPath, true);
+    }
+  }
+  
+  public static void deletePaths(List<Path> paths, FileSystem fs) throws IOException {
+    for (int i = 0; i < paths.size(); i++) {
+      Path path = paths.get(i);
+      if (fs.exists(path)) {
+        log.warn("Deleting {}", path);
+        fs.delete(path, true);
+      }
+    }
+  }
+  
+  public static void rename(Path from, Path to, FileSystem fs) throws IOException {
+    log.warn("Renaming " + from.toUri() + " to " + to.toUri());
+    fs.rename(from, to);
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringTuple.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringTuple.java?rev=906869&r1=906868&r2=906869&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringTuple.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringTuple.java Fri Feb  5 09:27:04 2010
@@ -24,7 +24,7 @@
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
@@ -133,21 +133,20 @@
   public void readFields(DataInput in) throws IOException {
     int len = in.readInt();
     tuple = new ArrayList<String>(len);
+    Text value = new Text();
     for (int i = 0; i < len; i++) {
-      int fieldLen = in.readInt();
-      byte[] entry = new byte[fieldLen];
-      in.readFully(entry);
-      tuple.add(Bytes.toString(entry));
+      value.readFields(in);
+      tuple.add(value.toString());
     }
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(tuple.size());
+    Text value = new Text();
     for (String entry : tuple) {
-      byte[] data = Bytes.toBytes(entry);
-      out.writeInt(data.length);
-      out.write(data);
+      value.set(entry);
+      value.write(out);
     }
   }
 

Modified: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SparseVectorsFromSequenceFiles.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SparseVectorsFromSequenceFiles.java?rev=906869&r1=906868&r2=906869&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SparseVectorsFromSequenceFiles.java (original)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SparseVectorsFromSequenceFiles.java Fri Feb  5 09:27:04 2010
@@ -20,64 +20,98 @@
 import org.apache.commons.cli2.CommandLine;
 import org.apache.commons.cli2.Group;
 import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
 import org.apache.commons.cli2.builder.ArgumentBuilder;
 import org.apache.commons.cli2.builder.DefaultOptionBuilder;
 import org.apache.commons.cli2.builder.GroupBuilder;
 import org.apache.commons.cli2.commandline.Parser;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.utils.vectors.common.PartialVectorMerger;
+import org.apache.mahout.utils.vectors.lucene.LuceneIterable;
 import org.apache.mahout.utils.vectors.text.DictionaryVectorizer;
+import org.apache.mahout.utils.vectors.text.DocumentProcessor;
+import org.apache.mahout.utils.vectors.tfidf.TFIDFConverter;
 
 /**
  * Converts a given set of sequence files into SparseVectors
  * 
  */
 public final class SparseVectorsFromSequenceFiles {
-
-  private SparseVectorsFromSequenceFiles() {
-  }
-
+  
+  private SparseVectorsFromSequenceFiles() {}
+  
   public static void main(String[] args) throws Exception {
     DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
     ArgumentBuilder abuilder = new ArgumentBuilder();
     GroupBuilder gbuilder = new GroupBuilder();
     
-    Option inputDirOpt =
-        obuilder.withLongName("input").withRequired(true).withArgument(
-            abuilder.withName("input").withMinimum(1).withMaximum(1).create())
-            .withDescription(
-                "input dir containing the documents in sequence file format")
-            .withShortName("i").create();
-    
-    Option outputDirOpt =
-        obuilder.withLongName("outputDir").withRequired(true).withArgument(
-            abuilder.withName("outputDir").withMinimum(1).withMaximum(1)
-                .create()).withDescription("The output directory")
-            .withShortName("o").create();
-    Option minSupportOpt =
-        obuilder.withLongName("minSupport").withArgument(
-            abuilder.withName("minSupport").withMinimum(1).withMaximum(1)
-                .create()).withDescription(
-            "(Optional) Minimum Support. Default Value: 2").withShortName("s")
-            .create();
-    
-    Option analyzerNameOpt =
-        obuilder.withLongName("analyzerName").withArgument(
-            abuilder.withName("analyzerName").withMinimum(1).withMaximum(1)
-                .create()).withDescription("The class name of the analyzer")
-            .withShortName("a").create();
-    
-    Option chunkSizeOpt =
-        obuilder.withLongName("chunkSize").withArgument(
-            abuilder.withName("chunkSize").withMinimum(1).withMaximum(1)
-                .create()).withDescription(
-            "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk")
-            .create();
-    
-    Group group =
-        gbuilder.withName("Options").withOption(minSupportOpt).withOption(
-            analyzerNameOpt).withOption(chunkSizeOpt).withOption(outputDirOpt)
-            .withOption(inputDirOpt).create();
+    Option inputDirOpt = obuilder.withLongName("input").withRequired(true)
+        .withArgument(
+          abuilder.withName("input").withMinimum(1).withMaximum(1).create())
+        .withDescription(
+          "input dir containing the documents in sequence file format")
+        .withShortName("i").create();
+    
+    Option outputDirOpt = obuilder
+        .withLongName("outputDir")
+        .withRequired(true)
+        .withArgument(
+          abuilder.withName("outputDir").withMinimum(1).withMaximum(1).create())
+        .withDescription("The output directory").withShortName("o").create();
+    Option minSupportOpt = obuilder.withLongName("minSupport").withArgument(
+      abuilder.withName("minSupport").withMinimum(1).withMaximum(1).create())
+        .withDescription("(Optional) Minimum Support. Default Value: 2")
+        .withShortName("s").create();
+    
+    Option analyzerNameOpt = obuilder.withLongName("analyzerName")
+        .withArgument(
+          abuilder.withName("analyzerName").withMinimum(1).withMaximum(1)
+              .create()).withDescription("The class name of the analyzer")
+        .withShortName("a").create();
+    
+    Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(
+      abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create())
+        .withDescription("The chunkSize in MegaBytes. 100-10000 MB")
+        .withShortName("chunk").create();
+    
+    Option weightOpt = obuilder.withLongName("weight").withRequired(false)
+        .withArgument(
+          abuilder.withName("weight").withMinimum(1).withMaximum(1).create())
+        .withDescription("The kind of weight to use. Currently TF or TFIDF")
+        .withShortName("w").create();
+    
+    Option minDFOpt = obuilder.withLongName("minDF").withRequired(false)
+        .withArgument(
+          abuilder.withName("minDF").withMinimum(1).withMaximum(1).create())
+        .withDescription("The minimum document frequency.  Default is 1")
+        .withShortName("md").create();
+    
+    Option maxDFPercentOpt = obuilder
+        .withLongName("maxDFPercent")
+        .withRequired(false)
+        .withArgument(
+          abuilder.withName("maxDFPercent").withMinimum(1).withMaximum(1)
+              .create())
+        .withDescription(
+          "The max percentage of docs for the DF.  Can be used to remove really high frequency terms.  Expressed as an integer between 0 and 100. Default is 99.")
+        .withShortName("x").create();
+    Option powerOpt = obuilder
+        .withLongName("norm")
+        .withRequired(false)
+        .withArgument(
+          abuilder.withName("norm").withMinimum(1).withMaximum(1).create())
+        .withDescription(
+          "The norm to use, expressed as either a float or \"INF\" if you want to use the Infinite norm.  "
+              + "Must be greater or equal to 0.  The default is not to normalize")
+        .withShortName("n").create();
+    
+    Group group = gbuilder.withName("Options").withOption(minSupportOpt)
+        .withOption(analyzerNameOpt).withOption(chunkSizeOpt).withOption(
+          outputDirOpt).withOption(inputDirOpt).withOption(minDFOpt)
+        .withOption(maxDFPercentOpt).withOption(weightOpt).withOption(powerOpt)
+        .create();
     
     Parser parser = new Parser();
     parser.setGroup(group);
@@ -95,7 +129,7 @@
       String minSupportString = (String) cmdLine.getValue(minSupportOpt);
       minSupport = Integer.parseInt(minSupportString);
     }
-    String analyzerName = (String) cmdLine.getValue(analyzerNameOpt);
+    
     Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class;
     if (cmdLine.hasOption(analyzerNameOpt)) {
       String className = cmdLine.getValue(analyzerNameOpt).toString();
@@ -104,7 +138,51 @@
       // you can't instantiate it
       analyzerClass.newInstance();
     }
-    DictionaryVectorizer.createTermFrequencyVectors(inputDir, outputDir,
-        analyzerClass, minSupport, chunkSize);
+    
+    boolean processIdf = false;
+    
+    if (cmdLine.hasOption(weightOpt)) {
+      String wString = cmdLine.getValue(weightOpt).toString();
+      if (wString.equalsIgnoreCase("tf")) {
+        processIdf = false;
+      } else if (wString.equalsIgnoreCase("tfidf")) {
+        processIdf = true;
+      } else {
+        throw new OptionException(weightOpt);
+      }
+    } else {
+      processIdf = true;
+    }
+    
+    int minDf = 1;
+    if (cmdLine.hasOption(minDFOpt)) {
+      minDf = Integer.parseInt(cmdLine.getValue(minDFOpt).toString());
+    }
+    int maxDFPercent = 99;
+    if (cmdLine.hasOption(maxDFPercentOpt)) {
+      maxDFPercent = Integer.parseInt(cmdLine.getValue(maxDFPercentOpt)
+          .toString());
+    }
+    
+    float norm = PartialVectorMerger.NO_NORMALIZING;
+    if (cmdLine.hasOption(powerOpt)) {
+      String power = cmdLine.getValue(powerOpt).toString();
+      if (power.equals("INF")) {
+        norm = Float.POSITIVE_INFINITY;
+      } else {
+        norm = Float.parseFloat(power);
+      }
+    }
+    HadoopUtil.overwriteOutput(outputDir);
+    String tokenizedPath = outputDir + "/tokenized-documents";
+    DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath);
+    
+    DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath,
+      outputDir, minSupport, chunkSize);
+    if (processIdf) {
+      TFIDFConverter.processTfIdf(
+        outputDir + DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER,
+        outputDir + "/tfidf", chunkSize, minDf, maxDFPercent, norm);
+    }
   }
 }

Modified: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java?rev=906869&r1=906868&r2=906869&view=diff
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java (original)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java Fri Feb  5 09:27:04 2010
@@ -201,7 +201,7 @@
     Option helpOpt = obuilder.withLongName("help").
             withDescription("Print out help").withShortName("h").create();
 
-    Group group = gbuilder.withName("Options").withOption(seqOpt).withOption(outputOpt).withOption(substringOpt).withOption(pointsOpt).withOption(centroidJSonOpt).withOption(dictOpt).create();
+    Group group = gbuilder.withName("Options").withOption(helpOpt).withOption(seqOpt).withOption(outputOpt).withOption(substringOpt).withOption(pointsOpt).withOption(centroidJSonOpt).withOption(dictOpt).create();
 
     
     try {

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Merges partial vectors in to a full sparse vector
+ */
+public class PartialVectorMergeReducer extends MapReduceBase
+    implements
+    Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
+  
+  private final VectorWritable vectorWritable = new VectorWritable();
+  
+  private double normPower;
+  
+  @Override
+  public void reduce(WritableComparable<?> key,
+                     Iterator<VectorWritable> values,
+                     OutputCollector<WritableComparable<?>,VectorWritable> output,
+                     Reporter reporter) throws IOException {
+    
+    Vector vector = new RandomAccessSparseVector(key
+        .toString(), Integer.MAX_VALUE, 10);
+    while (values.hasNext()) {
+      VectorWritable value = values.next();
+      value.get().addTo(vector);
+    }
+    if (normPower != PartialVectorMerger.NO_NORMALIZING) {
+      vector = vector.normalize(normPower);
+    }
+    vectorWritable.set(vector);
+    output.collect(key, vectorWritable);
+  }
+  
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    normPower = job.getFloat(PartialVectorMerger.NORMALIZATION_POWER,
+      PartialVectorMerger.NO_NORMALIZING);
+  }
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.common;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * This class groups a set of input vectors. The Sequence file input should have
+ * a {@link WritableComparable} key containing document id and a
+ * {@link VectorWritable} value containing the term frequency vector. This class
+ * also does normalization of the vector.
+ * 
+ */
+public final class PartialVectorMerger {
+  
+  public static final float NO_NORMALIZING = -1.0f;
+  
+  public static final String NORMALIZATION_POWER = "normalization.power";
+  
+  /**
+   * Cannot be initialized. Use the static functions
+   */
+  private PartialVectorMerger() {
+
+  }
+  
+  /**
+   * Merge all the partial
+   * {@link org.apache.mahout.math.RandomAccessSparseVector}s into the complete
+   * Document {@link org.apache.mahout.math.RandomAccessSparseVector}
+   * 
+   * @param partialVectorPaths
+   *          input directory of the vectors in {@link SequenceFile} format
+   * @param output
+   *          output directory were the partial vectors have to be created
+   * @param normPower
+   *          The normalization value. Must be greater than or equal to 0 or
+   *          equal to {@link #NO_NORMALIZING}
+   * @throws IOException
+   */
+  public static void mergePartialVectors(List<Path> partialVectorPaths,
+                                         String output,
+                                         float normPower) throws IOException {
+    if (normPower != NO_NORMALIZING && normPower < 0) {
+      throw new IllegalArgumentException("normPower must either be -1 or >= 0");
+    }
+    
+    Configurable client = new JobClient();
+    JobConf conf = new JobConf(PartialVectorMerger.class);
+    conf.set("io.serializations",
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
+    // this conf parameter needs to be set enable serialisation of conf values
+    conf.setJobName("PartialVectorMerger::MergePartialVectors");
+    
+    conf.setFloat(NORMALIZATION_POWER, normPower);
+    
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(VectorWritable.class);
+    
+    FileInputFormat.setInputPaths(conf,
+      getCommaSeparatedPaths(partialVectorPaths));
+    
+    Path outputPath = new Path(output);
+    FileOutputFormat.setOutputPath(conf, outputPath);
+    
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setReducerClass(PartialVectorMergeReducer.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    
+    FileSystem dfs = FileSystem.get(outputPath.toUri(), conf);
+    if (dfs.exists(outputPath)) {
+      dfs.delete(outputPath, true);
+    }
+    
+    client.setConf(conf);
+    JobClient.runJob(conf);
+  }
+  
+  private static String getCommaSeparatedPaths(List<Path> paths) {
+    StringBuilder commaSeparatedPaths = new StringBuilder();
+    String sep = "";
+    for (Path path : paths) {
+      commaSeparatedPaths.append(sep).append(path.toString());
+      sep = ",";
+    }
+    return commaSeparatedPaths.toString();
+  }
+}

Modified: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java?rev=906869&r1=906868&r2=906869&view=diff
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java (original)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java Fri Feb  5 09:27:04 2010
@@ -19,8 +19,6 @@
 
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -41,29 +39,29 @@
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.StringTuple;
 import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.utils.vectors.common.PartialVectorMergeReducer;
+import org.apache.mahout.utils.vectors.common.PartialVectorMerger;
+import org.apache.mahout.utils.vectors.text.term.TFPartialVectorReducer;
+import org.apache.mahout.utils.vectors.text.term.TermCountMapper;
+import org.apache.mahout.utils.vectors.text.term.TermCountReducer;
 
 /**
  * This class converts a set of input documents in the sequence file format to
  * vectors. The Sequence file input should have a {@link Text} key containing
- * the unique document identifier and a {@link Text} value containing the whole
- * document. The document should be stored in UTF-8 encoding which is
- * recognizable by hadoop. It uses the given {@link Analyzer} to process the
- * document into {@link org.apache.lucene.analysis.Token}s. This is a dictionary
- * based Vectorizer.
+ * the unique document identifier and a {@link StringTuple} value containing the
+ * tokenized document. You may use {@link DocumentProcessor} to tokenize the
+ * document. This is a dictionary based Vectorizer.
  * 
  */
 public final class DictionaryVectorizer {
   
-  public static final String ANALYZER_CLASS = "AnalyzerClass";
-  
-  public static final Charset CHARSET = Charset.forName("UTF-8");
+  public static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "/vectors";
   
   private static final String DICTIONARY_FILE = "/dictionary.file-";
   
-  private static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "/vectors";
-  
   private static final String FREQUENCY_FILE = "/frequency.file-";
   
   private static final int MAX_CHUNKSIZE = 10000;
@@ -94,10 +92,9 @@
    * @param input
    *          input directory of the documents in {@link SequenceFile} format
    * @param output
-   *          output directory where {@link org.apache.mahout.math.RandomAccessSparseVector}'s of the document are
-   *          generated
-   * @param analyzerClass
-   *          the Lucene {@link Analyzer} used to tokenize the UTF-8
+   *          output directory where
+   *          {@link org.apache.mahout.math.RandomAccessSparseVector}'s of the
+   *          document are generated
    * @param minSupport
    *          the minimum frequency of the feature in the entire corpus to be
    *          considered for inclusion in the sparse vector
@@ -110,18 +107,11 @@
    *          that two simultaneous reducers can create partial vectors without
    *          thrashing the system due to increased swapping
    * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   * @throws URISyntaxException
    */
   public static void createTermFrequencyVectors(String input,
                                                 String output,
-                                                Class<? extends Analyzer> analyzerClass,
                                                 int minSupport,
-                                                int chunkSizeInMegabytes) throws IOException,
-                                                                         InterruptedException,
-                                                                         ClassNotFoundException,
-                                                                         URISyntaxException {
+                                                int chunkSizeInMegabytes) throws IOException {
     if (chunkSizeInMegabytes < MIN_CHUNKSIZE) {
       chunkSizeInMegabytes = MIN_CHUNKSIZE;
     } else if (chunkSizeInMegabytes > MAX_CHUNKSIZE) { // 10GB
@@ -131,23 +121,32 @@
     Path inputPath = new Path(input);
     Path wordCountPath = new Path(output + WORDCOUNT_OUTPUT_FOLDER);
     
-    startWordCounting(inputPath, analyzerClass, wordCountPath);
-    List<Path> dictionaryChunks =
-        createDictionaryChunks(minSupport, wordCountPath, output,
-            chunkSizeInMegabytes);
+    startWordCounting(inputPath, wordCountPath);
+    List<Path> dictionaryChunks = createDictionaryChunks(minSupport,
+      wordCountPath, output, chunkSizeInMegabytes);
     
     int partialVectorIndex = 0;
     List<Path> partialVectorPaths = new ArrayList<Path>();
     for (Path dictionaryChunk : dictionaryChunks) {
-      Path partialVectorOutputPath =
-          getPath(output + VECTOR_OUTPUT_FOLDER, partialVectorIndex++);
+      Path partialVectorOutputPath = getPath(output + VECTOR_OUTPUT_FOLDER,
+        partialVectorIndex++);
       partialVectorPaths.add(partialVectorOutputPath);
-      makePartialVectors(input, dictionaryChunk, analyzerClass,
-          partialVectorOutputPath);
+      makePartialVectors(input, dictionaryChunk, partialVectorOutputPath);
     }
     
-    createVectorFromPartialVectors(partialVectorPaths, output
-        + DOCUMENT_VECTOR_OUTPUT_FOLDER);
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(partialVectorPaths.get(0).toUri(), conf);
+    
+    String outputDir = output + DOCUMENT_VECTOR_OUTPUT_FOLDER;
+    if (dictionaryChunks.size() > 1) {
+      PartialVectorMerger.mergePartialVectors(partialVectorPaths, outputDir,
+        -1);
+      HadoopUtil.deletePaths(partialVectorPaths, fs);
+    } else {
+      Path singlePartialVectorOutputPath = partialVectorPaths.get(0);
+      HadoopUtil.deletePath(outputDir, fs);
+      HadoopUtil.rename(singlePartialVectorOutputPath, new Path(outputDir), fs);
+    }
   }
   
   /**
@@ -171,26 +170,23 @@
     Configuration conf = new Configuration();
     
     FileSystem fs = FileSystem.get(wordCountPath.toUri(), conf);
-    FileStatus[] outputFiles =
-        fs
-            .globStatus(new Path(wordCountPath.toString()
-                + OUTPUT_FILES_PATTERN));
-
+    FileStatus[] outputFiles = fs.globStatus(new Path(wordCountPath.toString()
+                                                      + OUTPUT_FILES_PATTERN));
+    
     long chunkSizeLimit = chunkSizeInMegabytes * 1024 * 1024;
     int chunkIndex = 0;
     Path chunkPath = getPath(dictionaryPathBase + DICTIONARY_FILE, chunkIndex);
     chunkPaths.add(chunkPath);
     
-    SequenceFile.Writer writer =
-        new SequenceFile.Writer(fs, conf, chunkPath, Text.class,
-            LongWritable.class);
-    
-    SequenceFile.Writer freqWriter =
-        new SequenceFile.Writer(fs, conf, getPath(dictionaryPathBase
-            + FREQUENCY_FILE, chunkIndex), Text.class, LongWritable.class);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, chunkPath,
+        Text.class, LongWritable.class);
+    
+    SequenceFile.Writer freqWriter = new SequenceFile.Writer(fs, conf, getPath(
+      dictionaryPathBase + FREQUENCY_FILE, chunkIndex), Text.class,
+        LongWritable.class);
     
     long currentChunkSize = 0;
-
+    
     long i = 0;
     for (FileStatus fileStatus : outputFiles) {
       Path path = fileStatus.getPath();
@@ -209,19 +205,16 @@
           chunkPath = getPath(dictionaryPathBase + DICTIONARY_FILE, chunkIndex);
           chunkPaths.add(chunkPath);
           
-          writer =
-              new SequenceFile.Writer(fs, conf, chunkPath, Text.class,
-                  LongWritable.class);
-          freqWriter =
-              new SequenceFile.Writer(fs, conf, getPath(dictionaryPathBase
-                  + FREQUENCY_FILE, chunkIndex), Text.class, LongWritable.class);
+          writer = new SequenceFile.Writer(fs, conf, chunkPath, Text.class,
+              LongWritable.class);
+          freqWriter = new SequenceFile.Writer(fs, conf, getPath(
+            dictionaryPathBase + FREQUENCY_FILE, chunkIndex), Text.class,
+              LongWritable.class);
           currentChunkSize = 0;
         }
         
-        int fieldSize =
-            SEQUENCEFILE_BYTE_OVERHEAD
-                + (key.toString().length() * 2)
-                + (Long.SIZE / 8);
+        int fieldSize = SEQUENCEFILE_BYTE_OVERHEAD
+                        + (key.toString().length() * 2) + (Long.SIZE / 8);
         currentChunkSize += fieldSize;
         writer.append(key, new LongWritable(i++));
         freqWriter.append(key, value);
@@ -234,67 +227,7 @@
     return chunkPaths;
   }
   
-  /**
-   * Merge all the partial {@link org.apache.mahout.math.RandomAccessSparseVector}s into the complete Document
-   * {@link org.apache.mahout.math.RandomAccessSparseVector}
-   * 
-   * @param partialVectorPaths
-   *          input directory of the documents in {@link SequenceFile} format
-   * @param output
-   *          output directory were the partial vectors have to be created
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   * @throws URISyntaxException
-   */
-  private static void createVectorFromPartialVectors(List<Path> partialVectorPaths,
-                                                     String output) throws IOException {
-    
-    Configurable client = new JobClient();
-    JobConf conf = new JobConf(DictionaryVectorizer.class);
-    conf.set("io.serializations",
-        "org.apache.hadoop.io.serializer.JavaSerialization,"
-            + "org.apache.hadoop.io.serializer.WritableSerialization");
-    // this conf parameter needs to be set enable serialisation of conf values
-    conf
-        .setJobName("DictionaryVectorizer Vector generator to group Partial Vectors");
-    
-    conf.setMapOutputKeyClass(Text.class);
-    conf.setMapOutputValueClass(VectorWritable.class);
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(VectorWritable.class);
-    
-    FileInputFormat.setInputPaths(conf,
-        getCommaSeparatedPaths(partialVectorPaths));
-    
-    Path outputPath = new Path(output);
-    FileOutputFormat.setOutputPath(conf, outputPath);
-    
-    conf.setMapperClass(IdentityMapper.class);
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setReducerClass(PartialVectorMerger.class);
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
-    
-    FileSystem dfs = FileSystem.get(outputPath.toUri(), conf);
-    if (dfs.exists(outputPath)) {
-      dfs.delete(outputPath, true);
-    }
-    
-    client.setConf(conf);
-    JobClient.runJob(conf);
-  }
-  
-  private static String getCommaSeparatedPaths(List<Path> paths) {
-    StringBuilder commaSeparatedPaths = new StringBuilder();
-    String sep = "";
-    for (Path path : paths) {
-      commaSeparatedPaths.append(sep).append(path.toString());
-      sep = ",";
-    }
-    return commaSeparatedPaths.toString();
-  }
-  
-  public static Path getPath(String basePath, int index) {
+  private static Path getPath(String basePath, int index) {
     return new Path(basePath + index);
   }
   
@@ -308,33 +241,24 @@
    *          location of the chunk of features and the id's
    * @param output
    *          output directory were the partial vectors have to be created
-   * @param analyzerClass
-   *          The Lucene {@link Analyzer} for tokenizing the text
    * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   * @throws URISyntaxException
    */
   private static void makePartialVectors(String input,
                                          Path dictionaryFilePath,
-                                         Class<? extends Analyzer> analyzerClass,
                                          Path output) throws IOException {
     
     Configurable client = new JobClient();
     JobConf conf = new JobConf(DictionaryVectorizer.class);
     conf.set("io.serializations",
-        "org.apache.hadoop.io.serializer.JavaSerialization,"
-            + "org.apache.hadoop.io.serializer.WritableSerialization");
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
     // this conf parameter needs to be set enable serialisation of conf values
     
-    conf.set(ANALYZER_CLASS, analyzerClass.getName());
-    conf.setJobName("DictionaryVectorizer Partial Vector running over input: "
-        + input
-        + " using dictionary file"
-        + dictionaryFilePath.toString());
-    
+    conf.setJobName("DictionaryVectorizer::MakePartialVectors: input-folder: "
+                    + input + ", dictionary-file: "
+                    + dictionaryFilePath.toString());
     conf.setMapOutputKeyClass(Text.class);
-    conf.setMapOutputValueClass(Text.class);
+    conf.setMapOutputValueClass(StringTuple.class);
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(VectorWritable.class);
     DistributedCache
@@ -343,9 +267,9 @@
     
     FileOutputFormat.setOutputPath(conf, output);
     
-    conf.setMapperClass(DocumentTokenizerMapper.class);
+    conf.setMapperClass(IdentityMapper.class);
     conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setReducerClass(PartialVectorGenerator.class);
+    conf.setReducerClass(TFPartialVectorReducer.class);
     conf.setOutputFormat(SequenceFileOutputFormat.class);
     FileSystem dfs = FileSystem.get(output.toUri(), conf);
     if (dfs.exists(output)) {
@@ -360,20 +284,17 @@
    * Count the frequencies of words in parallel using Map/Reduce. The input
    * documents have to be in {@link SequenceFile} format
    */
-  private static void startWordCounting(Path input,
-                                        Class<? extends Analyzer> analyzerClass,
-                                        Path output) throws IOException {
+  private static void startWordCounting(Path input, Path output) throws IOException {
     
     Configurable client = new JobClient();
     JobConf conf = new JobConf(DictionaryVectorizer.class);
     conf.set("io.serializations",
-        "org.apache.hadoop.io.serializer.JavaSerialization,"
-            + "org.apache.hadoop.io.serializer.WritableSerialization");
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
     // this conf parameter needs to be set enable serialisation of conf values
     
-    conf.set(ANALYZER_CLASS, analyzerClass.getName());
-    conf.setJobName("DictionaryVectorizer Word Count running over input: "
-        + input.toString());
+    conf.setJobName("DictionaryVectorizer::WordCount: input-folder: "
+                    + input.toString());
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(LongWritable.class);
     

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.common.StringTuple;
+import org.apache.mahout.utils.vectors.text.document.SequenceFileTokenizerMapper;
+
+/**
+ * This class converts a set of input documents in the sequence file format of
+ * {@link StringTuple}s.The {@link SequenceFile} input should have a
+ * {@link Text} key containing the unique document identifier and a {@link Text}
+ * value containing the whole document. The document should be stored in UTF-8
+ * encoding which is recognizable by hadoop. It uses the given {@link Analyzer}
+ * to process the document into {@link org.apache.lucene.analysis.Token}s.
+ * 
+ */
+public final class DocumentProcessor {
+  
+  public static final String ANALYZER_CLASS = "analyzer.class";
+  
+  public static final Charset CHARSET = Charset.forName("UTF-8");
+  
+  /**
+   * Cannot be initialized. Use the static functions
+   */
+  private DocumentProcessor() {
+
+  }
+  
+  /**
+   * Convert the input documents into token array using the {@link StringTuple}
+   * The input documents has to be in the {@link SequenceFile} format
+   * 
+   * @param input
+   *          input directory of the documents in {@link SequenceFile} format
+   * @param output
+   *          output directory were the {@link StringTuple} token array of each
+   *          document has to be created
+   * @param analyzerClass
+   *          The Lucene {@link Analyzer} for tokenizing the UTF-8 text
+   * @throws IOException
+   */
+  public static void tokenizeDocuments(String input,
+                                       Class<? extends Analyzer> analyzerClass,
+                                       String output) throws IOException {
+    
+    Configurable client = new JobClient();
+    JobConf conf = new JobConf(DocumentProcessor.class);
+    conf.set("io.serializations",
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
+    // this conf parameter needs to be set enable serialisation of conf values
+    
+    conf.set(ANALYZER_CLASS, analyzerClass.getName());
+    conf.setJobName("DocumentProcessor::DocumentTokenizer: input-folder: "
+                    + input);
+    
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(StringTuple.class);
+    FileInputFormat.setInputPaths(conf, new Path(input));
+    Path outPath = new Path(output);
+    FileOutputFormat.setOutputPath(conf, outPath);
+    
+    conf.setMapperClass(SequenceFileTokenizerMapper.class);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setNumReduceTasks(0);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+    if (dfs.exists(outPath)) {
+      dfs.delete(outPath, true);
+    }
+    
+    client.setConf(conf);
+    JobClient.runJob(conf);
+  }
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text.document;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.analysis.tokenattributes.TermAttribute;
+import org.apache.mahout.common.StringTuple;
+import org.apache.mahout.utils.vectors.text.DictionaryVectorizer;
+import org.apache.mahout.utils.vectors.text.DocumentProcessor;
+
+/**
+ * Tokenizes a text document and outputs tokens in a StringTuple
+ */
+public class SequenceFileTokenizerMapper extends MapReduceBase implements
+    Mapper<Text,Text,Text,StringTuple> {
+  
+  private Analyzer analyzer;
+  
+  @Override
+  public void map(Text key,
+                  Text value,
+                  OutputCollector<Text,StringTuple> output,
+                  Reporter reporter) throws IOException {
+    TokenStream stream = analyzer.tokenStream(key.toString(), new StringReader(
+        value.toString()));
+    TermAttribute termAtt = (TermAttribute) stream
+        .addAttribute(TermAttribute.class);
+    StringTuple document = new StringTuple();
+    while (stream.incrementToken()) {
+      if (termAtt.termLength() > 0) {
+        document.add(new String(termAtt.termBuffer(), 0, termAtt.termLength()));
+      }
+    }
+    output.collect(key,document);
+  }
+  
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    try {
+      ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+      Class<?> cl = ccl.loadClass(job.get(DocumentProcessor.ANALYZER_CLASS,
+        StandardAnalyzer.class.getName()));
+      analyzer = (Analyzer) cl.newInstance();
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException(e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+  
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text.term;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.common.StringTuple;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Converts a document in to a sparse vector
+ */
+public class TFPartialVectorReducer extends MapReduceBase implements
+    Reducer<Text,StringTuple,Text,VectorWritable> {
+  private Analyzer analyzer;
+  private final Map<String,int[]> dictionary = new HashMap<String,int[]>();
+  
+  private final VectorWritable vectorWritable = new VectorWritable();
+  
+  @Override
+  public void reduce(Text key,
+                     Iterator<StringTuple> values,
+                     OutputCollector<Text,VectorWritable> output,
+                     Reporter reporter) throws IOException {
+    if (values.hasNext() == false) return;
+    StringTuple value = values.next();
+    
+    Vector vector = new RandomAccessSparseVector(key.toString(),
+        Integer.MAX_VALUE, value.length()); // guess at initial size
+    
+    for (String tk : value.getEntries()) {
+      if (dictionary.containsKey(tk) == false) continue;
+      int tokenKey = dictionary.get(tk)[0];
+      vector.setQuick(tokenKey, vector.getQuick(tokenKey) + 1);
+    }
+    
+    vectorWritable.set(vector);
+    output.collect(key, vectorWritable);
+    
+  }
+  
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    try {
+      
+      URI[] localFiles = DistributedCache.getCacheFiles(job);
+      if (localFiles == null || localFiles.length < 1) {
+        throw new IllegalArgumentException(
+            "missing paths from the DistributedCache");
+      }
+      Path dictionaryFile = new Path(localFiles[0].getPath());
+      FileSystem fs = dictionaryFile.getFileSystem(job);
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, dictionaryFile,
+          job);
+      Text key = new Text();
+      LongWritable value = new LongWritable();
+      
+      // key is word value is id
+      while (reader.next(key, value)) {
+        dictionary.put(key.toString(), new int[] {Long.valueOf(value.get())
+            .intValue()});
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+  
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text.term;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.common.StringTuple;
+
+/**
+ * TextVectorizer Term Count Mapper. Tokenizes a text document and outputs the
+ * count of the words
+ * 
+ */
+public class TermCountMapper extends MapReduceBase implements
+    Mapper<Text,StringTuple,Text,LongWritable> {
+  @Override
+  public void map(Text key,
+                  StringTuple value,
+                  OutputCollector<Text,LongWritable> output,
+                  Reporter reporter) throws IOException {
+    
+    Map<String,MutableLong> wordCount = new HashMap<String,MutableLong>();
+    for (String word : value.getEntries()) {
+      if (wordCount.containsKey(word) == false) {
+        wordCount.put(word, new MutableLong(0));
+      }
+      wordCount.get(word).increment();
+    }
+    
+    for (Entry<String,MutableLong> entry : wordCount.entrySet()) {
+      output.collect(new Text(entry.getKey()), new LongWritable(entry
+          .getValue().longValue()));
+    }
+  }
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text.term;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Can also be used as a local Combiner. This accumulates all the words and the
+ * weights and sums them up.
+ */
+public class TermCountReducer extends MapReduceBase implements
+    Reducer<Text,LongWritable,Text,LongWritable> {
+  
+  @Override
+  public void reduce(Text key,
+                     Iterator<LongWritable> values,
+                     OutputCollector<Text,LongWritable> output,
+                     Reporter reporter) throws IOException {
+    long sum = 0;
+    while (values.hasNext())
+      sum += values.next().get();
+    output.collect(key, new LongWritable(sum));
+    
+  }
+  
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text.term;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.Vector.Element;
+
+/**
+ * TextVectorizer Document Frequency Count Mapper. Outputs 1 for each feature
+ * 
+ */
+public class TermDocumentCountMapper extends MapReduceBase implements
+    Mapper<WritableComparable<?>,VectorWritable,IntWritable,LongWritable> {
+  
+  private static LongWritable ONE = new LongWritable(1);
+  private static IntWritable TOTAL_COUNT = new IntWritable(-1);
+  
+  @Override
+  public void map(WritableComparable<?> key,
+                  VectorWritable value,
+                  OutputCollector<IntWritable,LongWritable> output,
+                  Reporter reporter) throws IOException {
+    Vector vector = value.get();
+    Iterator<Element> it = vector.iterateNonZero();
+    
+    while (it.hasNext()) {
+      Element e = it.next();
+      output.collect(new IntWritable(e.index()), ONE);
+    }
+    output.collect(TOTAL_COUNT, ONE);
+  }
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text.term;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Can also be used as a local Combiner. This accumulates all the features and
+ * the weights and sums them up.
+ */
+public class TermDocumentCountReducer extends MapReduceBase implements
+    Reducer<IntWritable,LongWritable,IntWritable,LongWritable> {
+  
+  @Override
+  public void reduce(IntWritable key,
+                     Iterator<LongWritable> values,
+                     OutputCollector<IntWritable,LongWritable> output,
+                     Reporter reporter) throws IOException {
+    long sum = 0;
+    while (values.hasNext())
+      sum += values.next().get();
+    output.collect(key, new LongWritable(sum));
+  }
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.tfidf;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.utils.vectors.common.PartialVectorMergeReducer;
+import org.apache.mahout.utils.vectors.common.PartialVectorMerger;
+import org.apache.mahout.utils.vectors.text.term.TermDocumentCountMapper;
+import org.apache.mahout.utils.vectors.text.term.TermDocumentCountReducer;
+
+/**
+ * This class converts a set of input vectors with term frequencies to TfIdf
+ * vectors. The Sequence file input should have a {@link WritableComparable} key
+ * containing and a {@link VectorWritable} value containing the term frequency
+ * vector. This is conversion class uses multiple map/reduces to convert the
+ * vectors to TfIdf format
+ * 
+ */
+public final class TFIDFConverter {
+  
+  public static final String VECTOR_COUNT = "vector.count";
+  
+  public static final String FEATURE_COUNT = "feature.count";
+  
+  public static final String MIN_DF = "min.df";
+  
+  public static final String MAX_DF_PERCENTAGE = "max.df.percentage";
+  
+  private static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "/vectors";
+  
+  private static final String FREQUENCY_FILE = "/frequency.file-";
+  
+  private static final int MAX_CHUNKSIZE = 10000;
+  
+  private static final int MIN_CHUNKSIZE = 100;
+  
+  private static final String OUTPUT_FILES_PATTERN = "/part-*";
+  
+  private static final int SEQUENCEFILE_BYTE_OVERHEAD = 45;
+  
+  private static final String VECTOR_OUTPUT_FOLDER = "/partial-vectors-";
+  
+  private static final String WORDCOUNT_OUTPUT_FOLDER = "/df-count";
+  
+  /**
+   * Cannot be initialized. Use the static functions
+   */
+  private TFIDFConverter() {
+
+  }
+  
+  /**
+   * Create Term Frequency-Inverse Document Frequency (Tf-Idf) Vectors from the
+   * input set of vectors in {@link SequenceFile} format. This job uses a fixed
+   * limit on the maximum memory used by the feature chunk per node thereby
+   * splitting the process across multiple map/reduces.
+   * 
+   * @param input
+   *          input directory of the vectors in {@link SequenceFile} format
+   * @param output
+   *          output directory where
+   *          {@link org.apache.mahout.math.RandomAccessSparseVector}'s of the
+   *          document are generated
+   * @param minDf
+   *          The minimum document frequency. Default 1
+   * @param maxDFPercent
+   *          The max percentage of vectors for the DF. Can be used to remove
+   *          really high frequency features. Expressed as an integer between 0
+   *          and 100. Default 99
+   * @param chunkSizeInMegabytes
+   *          the size in MB of the feature => id chunk to be kept in memory at
+   *          each node during Map/Reduce stage. Its recommended you calculated
+   *          this based on the number of cores and the free memory available to
+   *          you per node. Say, you have 2 cores and around 1GB extra memory to
+   *          spare we recommend you use a split size of around 400-500MB so
+   *          that two simultaneous reducers can create partial vectors without
+   *          thrashing the system due to increased swapping
+   * @throws IOException
+   */
+  public static void processTfIdf(String input,
+                                  String output,
+                                  int chunkSizeInMegabytes,
+                                  int minDf,
+                                  int maxDFPercent,
+                                  float normPower) throws IOException {
+    if (chunkSizeInMegabytes < MIN_CHUNKSIZE) {
+      chunkSizeInMegabytes = MIN_CHUNKSIZE;
+    } else if (chunkSizeInMegabytes > MAX_CHUNKSIZE) { // 10GB
+      chunkSizeInMegabytes = MAX_CHUNKSIZE;
+    }
+    
+    if (normPower != PartialVectorMerger.NO_NORMALIZING && normPower < 0) {
+      throw new IllegalArgumentException("normPower must either be -1 or >= 0");
+    }
+    
+    if (minDf < 1) minDf = 1;
+    if (maxDFPercent < 0 || maxDFPercent > 100) maxDFPercent = 99;
+    
+    Path inputPath = new Path(input);
+    Path wordCountPath = new Path(output + WORDCOUNT_OUTPUT_FOLDER);
+    
+    startDFCounting(inputPath, wordCountPath);
+    Pair<Long[],List<Path>> datasetFeatures = createDictionaryChunks(
+      wordCountPath, output, chunkSizeInMegabytes);
+    
+    int partialVectorIndex = 0;
+    List<Path> partialVectorPaths = new ArrayList<Path>();
+    List<Path> dictionaryChunks = datasetFeatures.getSecond();
+    for (Path dictionaryChunk : dictionaryChunks) {
+      Path partialVectorOutputPath = getPath(output + VECTOR_OUTPUT_FOLDER,
+        partialVectorIndex++);
+      partialVectorPaths.add(partialVectorOutputPath);
+      makePartialVectors(input, datasetFeatures.getFirst()[0], datasetFeatures
+          .getFirst()[1], minDf, maxDFPercent, dictionaryChunk,
+        partialVectorOutputPath);
+    }
+    
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(partialVectorPaths.get(0).toUri(), conf);
+    
+    String outputDir = output + DOCUMENT_VECTOR_OUTPUT_FOLDER;
+    if (dictionaryChunks.size() > 1) {
+      PartialVectorMerger.mergePartialVectors(partialVectorPaths, outputDir,
+        normPower);
+      HadoopUtil.deletePaths(partialVectorPaths, fs);
+    } else {
+      Path singlePartialVectorOutputPath = partialVectorPaths.get(0);
+      HadoopUtil.deletePath(outputDir, fs);
+      HadoopUtil.rename(singlePartialVectorOutputPath, new Path(outputDir), fs);
+    }
+  }
+  
+  /**
+   * Read the document frequency List which is built at the end of the DF Count
+   * Job. This will use constant memory and will run at the speed of your disk
+   * read
+   * 
+   * @param featureCountPath
+   * @param dictionaryPathBase
+   * @throws IOException
+   */
+  private static Pair<Long[],List<Path>> createDictionaryChunks(Path featureCountPath,
+                                                                String dictionaryPathBase,
+                                                                int chunkSizeInMegabytes) throws IOException {
+    List<Path> chunkPaths = new ArrayList<Path>();
+    
+    IntWritable key = new IntWritable();
+    LongWritable value = new LongWritable();
+    Configuration conf = new Configuration();
+    
+    FileSystem fs = FileSystem.get(featureCountPath.toUri(), conf);
+    FileStatus[] outputFiles = fs.globStatus(new Path(featureCountPath
+        .toString()
+                                                      + OUTPUT_FILES_PATTERN));
+    
+    long chunkSizeLimit = chunkSizeInMegabytes * 1024 * 1024;
+    int chunkIndex = 0;
+    Path chunkPath = getPath(dictionaryPathBase + FREQUENCY_FILE, chunkIndex);
+    chunkPaths.add(chunkPath);
+    SequenceFile.Writer freqWriter = new SequenceFile.Writer(fs, conf,
+        chunkPath, IntWritable.class, LongWritable.class);
+    
+    long currentChunkSize = 0;
+    long featureCount = 0;
+    long vectorCount = Long.MAX_VALUE;
+    for (FileStatus fileStatus : outputFiles) {
+      Path path = fileStatus.getPath();
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      // key is feature value is count
+      while (reader.next(key, value)) {
+        if (currentChunkSize > chunkSizeLimit) {
+          freqWriter.close();
+          chunkIndex++;
+          
+          chunkPath = getPath(dictionaryPathBase + FREQUENCY_FILE, chunkIndex);
+          chunkPaths.add(chunkPath);
+          
+          freqWriter = new SequenceFile.Writer(fs, conf, chunkPath,
+              IntWritable.class, LongWritable.class);
+          currentChunkSize = 0;
+        }
+        
+        int fieldSize = SEQUENCEFILE_BYTE_OVERHEAD + (Integer.SIZE / 8)
+                        + (Long.SIZE / 8);
+        currentChunkSize += fieldSize;
+        if (key.get() >= 0) {
+          freqWriter.append(key, value);
+        } else if (key.get() == -1) {
+          vectorCount = value.get();
+        }
+        featureCount++;
+        
+      }
+    }
+    freqWriter.close();
+    Long[] counts = {Long.valueOf(featureCount), Long.valueOf(vectorCount)};
+    return new Pair<Long[],List<Path>>(counts, chunkPaths);
+  }
+  
+  public static Path getPath(String basePath, int index) {
+    return new Path(basePath + index);
+  }
+  
+  /**
+   * Create a partial tfidf vector using a chunk of features from the input
+   * vectors. The input vectors has to be in the {@link SequenceFile} format
+   * 
+   * @param input
+   *          input directory of the vectors in {@link SequenceFile} format
+   * @param featureCount
+   *          Number of unique features in the dataset
+   * @param vectorCount
+   *          Number of vectors in the dataset
+   * @param minDf
+   *          The minimum document frequency. Default 1
+   * @param maxDFPercent
+   *          The max percentage of vectors for the DF. Can be used to remove
+   *          really high frequency features. Expressed as an integer between 0
+   *          and 100. Default 99
+   * @param dictionaryFilePath
+   *          location of the chunk of features and the id's
+   * @param output
+   *          output directory were the partial vectors have to be created
+   * @throws IOException
+   */
+  private static void makePartialVectors(String input,
+                                         Long featureCount,
+                                         Long vectorCount,
+                                         int minDf,
+                                         int maxDFPercent,
+                                         Path dictionaryFilePath,
+                                         Path output) throws IOException {
+    
+    Configurable client = new JobClient();
+    JobConf conf = new JobConf(TFIDFConverter.class);
+    conf.set("io.serializations",
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
+    // this conf parameter needs to be set enable serialisation of conf values
+    
+    conf.setJobName("TFIDFConverter:: MakePartialVectors: input-folder: "
+                    + input + ", dictionary-file: "
+                    + dictionaryFilePath.toString());
+    conf.setLong(FEATURE_COUNT, featureCount.longValue());
+    conf.setLong(VECTOR_COUNT, vectorCount.longValue());
+    conf.setInt(MIN_DF, minDf);
+    conf.setInt(MAX_DF_PERCENTAGE, maxDFPercent);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(VectorWritable.class);
+    DistributedCache
+        .setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);
+    FileInputFormat.setInputPaths(conf, new Path(input));
+    
+    FileOutputFormat.setOutputPath(conf, output);
+    
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setReducerClass(TFIDFPartialVectorReducer.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileSystem dfs = FileSystem.get(output.toUri(), conf);
+    if (dfs.exists(output)) {
+      dfs.delete(output, true);
+    }
+    
+    client.setConf(conf);
+    JobClient.runJob(conf);
+  }
+  
+  /**
+   * Count the document frequencies of features in parallel using Map/Reduce.
+   * The input documents have to be in {@link SequenceFile} format
+   */
+  private static void startDFCounting(Path input, Path output) throws IOException {
+    
+    Configurable client = new JobClient();
+    JobConf conf = new JobConf(TFIDFConverter.class);
+    conf.set("io.serializations",
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
+    // this conf parameter needs to be set enable serialisation of conf values
+    
+    conf.setJobName("VectorTfIdf Document Frequency Count running over input: "
+                    + input.toString());
+    conf.setOutputKeyClass(IntWritable.class);
+    conf.setOutputValueClass(LongWritable.class);
+    
+    FileInputFormat.setInputPaths(conf, input);
+    Path outPath = output;
+    FileOutputFormat.setOutputPath(conf, outPath);
+    
+    conf.setMapperClass(TermDocumentCountMapper.class);
+    
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setCombinerClass(TermDocumentCountReducer.class);
+    conf.setReducerClass(TermDocumentCountReducer.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    
+    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+    if (dfs.exists(outPath)) {
+      dfs.delete(outPath, true);
+    }
+    client.setConf(conf);
+    JobClient.runJob(conf);
+  }
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java?rev=906869&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java Fri Feb  5 09:27:04 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.tfidf;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+import org.apache.mahout.utils.vectors.TFIDF;
+
+/**
+ * Converts a document in to a sparse vector
+ */
+public class TFIDFPartialVectorReducer extends MapReduceBase
+    implements
+    Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
+  
+  private final OpenIntLongHashMap dictionary = new OpenIntLongHashMap();
+  private final VectorWritable vectorWritable = new VectorWritable();
+  private final TFIDF tfidf = new TFIDF();
+  private int minDf = 1;
+  private int maxDfPercent = 99;
+  private long vectorCount = 1;
+  private long featureCount = 0;
+  
+  @Override
+  public void reduce(WritableComparable<?> key,
+                     Iterator<VectorWritable> values,
+                     OutputCollector<WritableComparable<?>,VectorWritable> output,
+                     Reporter reporter) throws IOException {
+    if (values.hasNext() == false) return;
+    Vector value = values.next().get();
+    Iterator<Element> it = value.iterateNonZero();
+    Vector vector = new RandomAccessSparseVector(key
+        .toString(), Integer.MAX_VALUE, value.getNumNondefaultElements());
+    while (it.hasNext()) {
+      Element e = it.next();
+      if (dictionary.containsKey(e.index()) == false) continue;
+      long df = dictionary.get(e.index());
+      if (df / vectorCount > maxDfPercent) continue;
+      if (df < minDf) df = minDf;
+      vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df,
+        (int) featureCount, (int) vectorCount));
+    }
+    
+    vectorWritable.set(vector);
+    output.collect(key, vectorWritable);
+  }
+  
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    try {
+      
+      URI[] localFiles = DistributedCache.getCacheFiles(job);
+      if (localFiles == null || localFiles.length < 1) {
+        throw new IllegalArgumentException(
+            "missing paths from the DistributedCache");
+      }
+      
+      vectorCount = job.getLong(TFIDFConverter.VECTOR_COUNT, 1);
+      featureCount = job.getLong(TFIDFConverter.FEATURE_COUNT, 1);
+      minDf = job.getInt(TFIDFConverter.MIN_DF, 1);
+      maxDfPercent = job.getInt(TFIDFConverter.MAX_DF_PERCENTAGE, 99);
+      
+      Path dictionaryFile = new Path(localFiles[0].getPath());
+      FileSystem fs = dictionaryFile.getFileSystem(job);
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, dictionaryFile,
+          job);
+      IntWritable key = new IntWritable();
+      LongWritable value = new LongWritable();
+      
+      // key is feature, value is the document frequency
+      while (reader.next(key, value)) {
+        dictionary.put(key.get(), value.get());
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}

Modified: lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java?rev=906869&r1=906868&r2=906869&view=diff
==============================================================================
--- lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java (original)
+++ lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java Fri Feb  5 09:27:04 2010
@@ -32,6 +32,7 @@
 import org.apache.lucene.util.Version;
 import org.apache.mahout.common.MahoutTestCase;
 import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.utils.vectors.tfidf.TFIDFConverter;
 
 /**
  * Test the dictionary Vector
@@ -50,8 +51,8 @@
   
   public static final String DELIM = " .,?;:!\t\n\r";
   
-  public static final String ERRORSET =
-      "`1234567890" + "-=~@#$%^&*()_+[]{}'\"/<>|\\";
+  public static final String ERRORSET = "`1234567890"
+                                        + "-=~@#$%^&*()_+[]{}'\"/<>|\\";
   
   private static final Random random = RandomUtils.getRandom();
   
@@ -62,10 +63,10 @@
   }
   
   public static String getRandomDocument() {
-    int length =
-        (AVG_DOCUMENT_LENGTH >> 1) + random.nextInt(AVG_DOCUMENT_LENGTH);
-    StringBuilder sb =
-        new StringBuilder(length * AVG_SENTENCE_LENGTH * AVG_WORD_LENGTH);
+    int length = (AVG_DOCUMENT_LENGTH >> 1)
+                 + random.nextInt(AVG_DOCUMENT_LENGTH);
+    StringBuilder sb = new StringBuilder(length * AVG_SENTENCE_LENGTH
+                                         * AVG_WORD_LENGTH);
     for (int i = 0; i < length; i++) {
       sb.append(getRandomSentence());
     }
@@ -73,8 +74,8 @@
   }
   
   public static String getRandomSentence() {
-    int length =
-        (AVG_SENTENCE_LENGTH >> 1) + random.nextInt(AVG_SENTENCE_LENGTH);
+    int length = (AVG_SENTENCE_LENGTH >> 1)
+                 + random.nextInt(AVG_SENTENCE_LENGTH);
     StringBuilder sb = new StringBuilder(length * AVG_WORD_LENGTH);
     for (int i = 0; i < length; i++) {
       sb.append(getRandomString()).append(' ');
@@ -123,18 +124,21 @@
     Configuration conf = new Configuration();
     String pathString = "testdata/documents/docs.file";
     Path path = new Path(pathString);
-    SequenceFile.Writer writer =
-        new SequenceFile.Writer(fs, conf, path, Text.class, Text.class);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+        Text.class, Text.class);
     
     for (int i = 0; i < NUM_DOCS; i++) {
       writer.append(new Text("Document::ID::" + i), new Text(
           getRandomDocument()));
     }
     writer.close();
-    Class<? extends Analyzer> analyzer =
-        new StandardAnalyzer(Version.LUCENE_CURRENT).getClass();
-    DictionaryVectorizer.createTermFrequencyVectors(pathString,
-        "output/wordcount", analyzer, 2, 100);
+    Class<? extends Analyzer> analyzer = new StandardAnalyzer(
+        Version.LUCENE_CURRENT).getClass();
+    DocumentProcessor.tokenizeDocuments(pathString, analyzer,
+      "output/tokenized-documents");
+    DictionaryVectorizer.createTermFrequencyVectors("output/tokenized-documents",
+      "output/wordcount", 2, 100);
+    TFIDFConverter.processTfIdf("output/wordcount/vectors", "output/tfidf/", 100, 1, 99, 1.0f);
     
   }
 }



Mime
View raw message