mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From robina...@apache.org
Subject svn commit: r906896 [1/2] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/ core/src/main...
Date Fri, 05 Feb 2010 11:00:26 GMT
Author: robinanil
Date: Fri Feb  5 11:00:25 2010
New Revision: 906896

URL: http://svn.apache.org/viewvc?rev=906896&view=rev
Log:
MAHOUT-221 Parallel FPGrowth with FPBonsai pruning. Code tweaks and style changes mostly

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConverter.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConverter.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConverter.java
Removed:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java
Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPGrowth.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTree.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTreeDepthCache.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FrequentPatternMaxHeap.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/Pattern.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthJob.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleMapper.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleReducer.java

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java Fri Feb  5 11:00:25 2010
@@ -28,23 +28,28 @@
 
 /**
  * 
- * {@link AggregatorMapper} outputs the pattern for each item in the pattern, so that reducer can group them
- * and select the top K frequent patterns
+ * {@link AggregatorMapper} outputs the pattern for each item in the pattern, so
+ * that reducer can group them and select the top K frequent patterns
  * 
  */
-public class AggregatorMapper extends Mapper<Text, TopKStringPatterns, Text, TopKStringPatterns> {
-
+public class AggregatorMapper extends
+    Mapper<Text,TopKStringPatterns,Text,TopKStringPatterns> {
+  
   @Override
-  protected void map(Text key, TopKStringPatterns values, Context context) throws IOException,
-      InterruptedException {
-    for (Pair<List<String>, Long> pattern : values.getPatterns()) {
+  protected void map(Text key,
+                     TopKStringPatterns values,
+                     Context context) throws IOException,
+                                     InterruptedException {
+    for (Pair<List<String>,Long> pattern : values.getPatterns()) {
       for (String item : pattern.getFirst()) {
-        List<Pair<List<String>, Long>> patternSingularList = new ArrayList<Pair<List<String>, Long>>();
+        List<Pair<List<String>,Long>> patternSingularList
+            = new ArrayList<Pair<List<String>,Long>>();
         patternSingularList.add(pattern);
         context.setStatus("Aggregator Mapper:Grouping Patterns for " + item);
-        context.write(new Text(item), new TopKStringPatterns(patternSingularList));
+        context.write(new Text(item), new TopKStringPatterns(
+            patternSingularList));
       }
     }
-
+    
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java Fri Feb  5 11:00:25 2010
@@ -31,29 +31,32 @@
  * 
  */
 public class AggregatorReducer extends
-    Reducer<Text, TopKStringPatterns, Text, TopKStringPatterns> {
-
+    Reducer<Text,TopKStringPatterns,Text,TopKStringPatterns> {
+  
   private int maxHeapSize = 50;
-
+  
   @Override
-  protected void reduce(Text key, Iterable<TopKStringPatterns> values,
-      Context context) throws IOException, InterruptedException {
+  protected void reduce(Text key,
+                        Iterable<TopKStringPatterns> values,
+                        Context context) throws IOException,
+                                        InterruptedException {
     TopKStringPatterns patterns = new TopKStringPatterns();
     for (TopKStringPatterns value : values) {
-      context.setStatus("Aggregator Reducer: Selecting TopK patterns for: " + key);
+      context.setStatus("Aggregator Reducer: Selecting TopK patterns for: "
+                        + key);
       patterns = patterns.merge(value, maxHeapSize);
     }
     context.write(key, patterns);
-
+    
   }
-
+  
   @Override
   protected void setup(Context context) throws IOException,
-      InterruptedException {
+                                       InterruptedException {
     super.setup(context);
     Parameters params = Parameters.fromString(context.getConfiguration().get(
-        "pfp.parameters", ""));
+      "pfp.parameters", ""));
     maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50"));
-
+    
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java Fri Feb  5 11:00:25 2010
@@ -22,31 +22,38 @@
 
 import org.apache.mahout.common.Pair;
 
-public final class MultiTransactionTreeIterator implements Iterator<List<Integer>> {
 
-  private Iterator<Pair<List<Integer>, Long>> pIterator = null;
-
-  private Pair<List<Integer>, Long> currentPattern = null;
-
-  private long currentCount = 0;
-
-  public MultiTransactionTreeIterator(Iterator<Pair<List<Integer>, Long>> iterator) {
+/**
+ * Iterates over multiple transaction trees to produce a single iterator of
+ * transactions
+ * 
+ */
+public final class MultiTransactionTreeIterator implements
+    Iterator<List<Integer>> {
+  
+  private Iterator<Pair<List<Integer>,Long>> pIterator;
+  
+  private Pair<List<Integer>,Long> currentPattern;
+  
+  private long currentCount;
+  
+  public MultiTransactionTreeIterator(Iterator<Pair<List<Integer>,Long>> iterator) {
     this.pIterator = iterator;
-
+    
     if (pIterator.hasNext()) {
       currentPattern = pIterator.next();
       currentCount = 0;
     } else {
       pIterator = null;
     }
-
+    
   }
-
+  
   @Override
   public boolean hasNext() {
     return pIterator != null;
   }
-
+  
   @Override
   public List<Integer> next() {
     List<Integer> returnable = currentPattern.getFirst();
@@ -61,10 +68,10 @@
     }
     return returnable;
   }
-
+  
   @Override
   public void remove() {
     throw new UnsupportedOperationException();
   }
-
+  
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Fri Feb  5 11:00:25 2010
@@ -51,18 +51,18 @@
 
 /**
  * 
- * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as described in the paper
- * http://infolab.stanford.edu/~echang/recsys08-69.pdf
+ * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as described in
+ * the paper http://infolab.stanford.edu/~echang/recsys08-69.pdf
  * 
  */
 public final class PFPGrowth {
-  public static final Pattern SPLITTER = Pattern.compile("[ ,\t]*[,|\t][ ,\t]*");
-
+  public static final Pattern SPLITTER = Pattern
+      .compile("[ ,\t]*[,|\t][ ,\t]*");
+  
   private static final Logger log = LoggerFactory.getLogger(PFPGrowth.class);
-
-  private PFPGrowth() {
-  }
-
+  
+  private PFPGrowth() { }
+  
   /**
    * Generates the fList from the serialized string representation
    * 
@@ -72,23 +72,25 @@
    * @return Deserialized Feature Frequency List
    * @throws IOException
    */
-  public static List<Pair<String, Long>> deserializeList(Parameters params, String key, Configuration conf)
-      throws IOException {
-    List<Pair<String, Long>> list = new ArrayList<Pair<String, Long>>();
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-        + "org.apache.hadoop.io.serializer.WritableSerialization");
-
-    DefaultStringifier<List<Pair<String, Long>>> listStringifier = new DefaultStringifier<List<Pair<String, Long>>>(
+  public static List<Pair<String,Long>> deserializeList(Parameters params,
+                                                        String key,
+                                                        Configuration conf) throws IOException {
+    List<Pair<String,Long>> list = new ArrayList<Pair<String,Long>>();
+    conf.set("io.serializations",
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
+    
+    DefaultStringifier<List<Pair<String,Long>>> listStringifier = new DefaultStringifier<List<Pair<String,Long>>>(
         conf, GenericsUtil.getClass(list));
     String serializedString = listStringifier.toString(list);
     serializedString = params.get(key, serializedString);
     list = listStringifier.fromString(serializedString);
     return list;
   }
-
+  
   /**
-   * Generates the gList(Group ID Mapping of Various frequent Features) Map from the corresponding serialized
-   * representation
+   * Generates the gList(Group ID Mapping of Various frequent Features) Map from
+   * the corresponding serialized representation
    * 
    * @param params
    * @param key
@@ -96,48 +98,54 @@
    * @return Deserialized Group List
    * @throws IOException
    */
-  public static Map<String, Long> deserializeMap(Parameters params, String key, Configuration conf)
-      throws IOException {
-    Map<String, Long> map = new HashMap<String, Long>();
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-        + "org.apache.hadoop.io.serializer.WritableSerialization");
-
-    DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(conf,
-        GenericsUtil.getClass(map));
+  public static Map<String,Long> deserializeMap(Parameters params,
+                                                String key,
+                                                Configuration conf) throws IOException {
+    Map<String,Long> map = new HashMap<String,Long>();
+    conf.set("io.serializations",
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
+    
+    DefaultStringifier<Map<String,Long>> mapStringifier = new DefaultStringifier<Map<String,Long>>(
+        conf, GenericsUtil.getClass(map));
     String gListString = mapStringifier.toString(map);
     gListString = params.get(key, gListString);
     map = mapStringifier.fromString(gListString);
     return map;
   }
-
+  
   /**
-   * read the feature frequency List which is built at the end of the Parallel counting job
+   * read the feature frequency List which is built at the end of the Parallel
+   * counting job
    * 
    * @param params
    * @return Feature Frequency List
    * @throws IOException
    */
-  public static List<Pair<String, Long>> readFList(Parameters params) throws IOException {
+  public static List<Pair<String,Long>> readFList(Parameters params) throws IOException {
     Writable key = new Text();
     LongWritable value = new LongWritable();
     int minSupport = Integer.valueOf(params.get("minSupport", "3"));
     Configuration conf = new Configuration();
-
-    FileSystem fs = FileSystem.get(new Path(params.get("output") + "/parallelcounting").toUri(), conf);
-    FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output") + "/parallelcounting/part-*"));
-
-    PriorityQueue<Pair<String, Long>> queue = new PriorityQueue<Pair<String, Long>>(11,
-        new Comparator<Pair<String, Long>>() {
-
+    
+    FileSystem fs = FileSystem.get(new Path(params.get("output")
+                                            + "/parallelcounting").toUri(),
+      conf);
+    FileStatus[] outputFiles = fs.globStatus(new Path(
+        params.get("output") + "/parallelcounting/part-*"));
+    
+    PriorityQueue<Pair<String,Long>> queue = new PriorityQueue<Pair<String,Long>>(
+        11, new Comparator<Pair<String,Long>>() {
+          
           @Override
-          public int compare(Pair<String, Long> o1, Pair<String, Long> o2) {
+          public int compare(Pair<String,Long> o1, Pair<String,Long> o2) {
             int ret = o2.getSecond().compareTo(o1.getSecond());
             if (ret != 0) {
               return ret;
             }
             return o1.getFirst().compareTo(o2.getFirst());
           }
-
+          
         });
     for (FileStatus fileStatus : outputFiles) {
       Path path = fileStatus.getPath();
@@ -147,16 +155,16 @@
         if (value.get() < minSupport) {
           continue;
         }
-        queue.add(new Pair<String, Long>(key.toString(), value.get()));
+        queue.add(new Pair<String,Long>(key.toString(), value.get()));
       }
     }
-    List<Pair<String, Long>> fList = new ArrayList<Pair<String, Long>>();
+    List<Pair<String,Long>> fList = new ArrayList<Pair<String,Long>>();
     while (queue.isEmpty() == false) {
       fList.add(queue.poll());
     }
     return fList;
   }
-
+  
   /**
    * Read the Frequent Patterns generated from Text
    * 
@@ -164,102 +172,111 @@
    * @return List of TopK patterns for each string frequent feature
    * @throws IOException
    */
-  public static List<Pair<String, TopKStringPatterns>> readFrequentPattern(Parameters params)
-      throws IOException {
-
+  public static List<Pair<String,TopKStringPatterns>> readFrequentPattern(Parameters params) throws IOException {
+    
     Configuration conf = new Configuration();
-
-    FileSystem fs = FileSystem.get(new Path(params.get("output") + "/frequentPatterns").toUri(), conf);
-    FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output") + "/frequentPatterns/part-*"));
-
-    List<Pair<String, TopKStringPatterns>> ret = new ArrayList<Pair<String, TopKStringPatterns>>();
+    
+    FileSystem fs = FileSystem.get(new Path(params.get("output")
+                                            + "/frequentPatterns").toUri(),
+      conf);
+    FileStatus[] outputFiles = fs.globStatus(new Path(
+        params.get("output") + "/frequentPatterns/part-*"));
+    
+    List<Pair<String,TopKStringPatterns>> ret = new ArrayList<Pair<String,TopKStringPatterns>>();
     for (FileStatus fileStatus : outputFiles) {
       Path path = fileStatus.getPath();
       ret.addAll(FPGrowth.readFrequentPattern(fs, conf, path));
     }
     return ret;
   }
-
+  
   /**
    * 
-   * @param params params should contain input and output locations as a string value, the additional
-   *        parameters include minSupport(3), maxHeapSize(50), numGroups(1000)
+   * @param params
+   *          params should contain input and output locations as a string
+   *          value, the additional parameters include minSupport(3),
+   *          maxHeapSize(50), numGroups(1000)
    * @throws IOException
    * @throws ClassNotFoundException
    * @throws InterruptedException
    */
-  public static void runPFPGrowth(Parameters params) throws IOException, InterruptedException,
-      ClassNotFoundException {
+  public static void runPFPGrowth(Parameters params) throws IOException,
+                                                    InterruptedException,
+                                                    ClassNotFoundException {
     startParallelCounting(params);
     startGroupingItems(params);
     startTransactionSorting(params);
     startParallelFPGrowth(params);
     startAggregating(params);
   }
-
+  
   /**
-   * Run the aggregation Job to aggregate the different TopK patterns and group each Pattern by the features
-   * present in it and thus calculate the final Top K frequent Patterns for each feature
+   * Run the aggregation Job to aggregate the different TopK patterns and group
+   * each Pattern by the features present in it and thus calculate the final Top
+   * K frequent Patterns for each feature
    * 
    * @param params
    * @throws IOException
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
-  public static void startAggregating(Parameters params) throws IOException, InterruptedException,
-      ClassNotFoundException {
-
+  public static void startAggregating(Parameters params) throws IOException,
+                                                        InterruptedException,
+                                                        ClassNotFoundException {
+    
     Configuration conf = new Configuration();
     params.set("fList", "");
     params.set("gList", "");
     conf.set("pfp.parameters", params.toString());
     conf.set("mapred.compress.map.output", "true");
     conf.set("mapred.output.compression.type", "BLOCK");
-
+    
     String input = params.get("output") + "/fpgrowth";
-    Job job = new Job(conf, "PFP Aggregator Driver running over input: " + input);
+    Job job = new Job(conf, "PFP Aggregator Driver running over input: "
+                            + input);
     job.setJarByClass(PFPGrowth.class);
-
+    
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(TopKStringPatterns.class);
-
+    
     FileInputFormat.addInputPath(job, new Path(input));
     Path outPath = new Path(params.get("output") + "/frequentPatterns");
     FileOutputFormat.setOutputPath(job, outPath);
-
+    
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setMapperClass(AggregatorMapper.class);
     job.setCombinerClass(AggregatorReducer.class);
     job.setReducerClass(AggregatorReducer.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
+    
     FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
     if (dfs.exists(outPath)) {
       dfs.delete(outPath, true);
     }
     job.waitForCompletion(true);
   }
-
+  
   /**
-   * Group the given Features into g groups as defined by the numGroups parameter in params
+   * Group the given Features into g groups as defined by the numGroups
+   * parameter in params
    * 
    * @param params
    * @throws IOException
    */
   public static void startGroupingItems(Parameters params) throws IOException {
     Configuration conf = new Configuration();
-    List<Pair<String, Long>> fList = readFList(params);
+    List<Pair<String,Long>> fList = readFList(params);
     Integer numGroups = Integer.valueOf(params.get("numGroups", "50"));
-
-    Map<String, Long> gList = new HashMap<String, Long>();
+    
+    Map<String,Long> gList = new HashMap<String,Long>();
     long maxPerGroup = fList.size() / numGroups;
     if (fList.size() != maxPerGroup * numGroups) {
       maxPerGroup++;
     }
-
+    
     long i = 0;
     long groupID = 0;
-    for (Pair<String, Long> featureFreq : fList) {
+    for (Pair<String,Long> featureFreq : fList) {
       String feature = featureFreq.getFirst();
       if (i / maxPerGroup == groupID) {
         gList.put(feature, groupID);
@@ -269,13 +286,13 @@
       }
       i++;
     }
-
+    
     log.info("No of Features: {}", fList.size());
-
+    
     params.set("gList", serializeMap(gList, conf));
     params.set("fList", serializeList(fList, conf));
   }
-
+  
   /**
    * Count the frequencies of various features in parallel using Map/Reduce
    * 
@@ -284,52 +301,56 @@
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
-  public static void startParallelCounting(Parameters params) throws IOException, InterruptedException,
-      ClassNotFoundException {
-
+  public static void startParallelCounting(Parameters params) throws IOException,
+                                                             InterruptedException,
+                                                             ClassNotFoundException {
+    
     Configuration conf = new Configuration();
     conf.set("pfp.parameters", params.toString());
-
+    
     conf.set("mapred.compress.map.output", "true");
     conf.set("mapred.output.compression.type", "BLOCK");
-
+    
     String input = params.get("input");
-    Job job = new Job(conf, "Parallel Counting Driver running over input: " + input);
+    Job job = new Job(conf, "Parallel Counting Driver running over input: "
+                            + input);
     job.setJarByClass(PFPGrowth.class);
-
+    
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(LongWritable.class);
-
+    
     FileInputFormat.addInputPath(job, new Path(input));
     Path outPath = new Path(params.get("output") + "/parallelcounting");
     FileOutputFormat.setOutputPath(job, outPath);
-
+    
     FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
     if (dfs.exists(outPath)) {
       dfs.delete(outPath, true);
     }
-
+    
     job.setInputFormatClass(TextInputFormat.class);
     job.setMapperClass(ParallelCountingMapper.class);
     job.setCombinerClass(ParallelCountingReducer.class);
     job.setReducerClass(ParallelCountingReducer.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
+    
     job.waitForCompletion(true);
-
+    
   }
-
+  
   /**
-   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards
+   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of
+   * group dependent shards
    * 
    * @param params
    * @throws IOException
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
-  public static void startTransactionSorting(Parameters params) throws IOException, InterruptedException,
-      ClassNotFoundException {
-
+  public static void startTransactionSorting(Parameters params) throws IOException,
+                                                               InterruptedException,
+                                                               ClassNotFoundException {
+    
     Configuration conf = new Configuration();
     String gList = params.get("gList");
     params.set("gList", "");
@@ -337,44 +358,47 @@
     conf.set("mapred.compress.map.output", "true");
     conf.set("mapred.output.compression.type", "BLOCK");
     String input = params.get("input");
-    Job job = new Job(conf, "PFP Transaction Sorting running over input" + input);
+    Job job = new Job(conf, "PFP Transaction Sorting running over input"
+                            + input);
     job.setJarByClass(PFPGrowth.class);
-
+    
     job.setMapOutputKeyClass(LongWritable.class);
     job.setMapOutputValueClass(TransactionTree.class);
-
+    
     job.setOutputKeyClass(LongWritable.class);
     job.setOutputValueClass(TransactionTree.class);
-
+    
     FileInputFormat.addInputPath(job, new Path(input));
     Path outPath = new Path(params.get("output") + "/sortedoutput");
     FileOutputFormat.setOutputPath(job, outPath);
-
+    
     FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
     if (dfs.exists(outPath)) {
       dfs.delete(outPath, true);
     }
-
+    
     job.setInputFormatClass(TextInputFormat.class);
     job.setMapperClass(TransactionSortingMapper.class);
     job.setReducerClass(TransactionSortingReducer.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
+    
     job.waitForCompletion(true);
     params.set("gList", gList);
   }
-
+  
   /**
-   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards
+   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of
+   * group dependent shards
    * 
    * @param params
    * @throws IOException
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
-  public static void startParallelFPGrowth(Parameters params) throws IOException, InterruptedException,
-      ClassNotFoundException {
-
+  public static void startParallelFPGrowth(Parameters params) throws IOException,
+                                                             InterruptedException,
+                                                             ClassNotFoundException {
+    
     Configuration conf = new Configuration();
     conf.set("pfp.parameters", params.toString());
     conf.set("mapred.compress.map.output", "true");
@@ -382,31 +406,31 @@
     String input = params.get("output") + "/sortedoutput";
     Job job = new Job(conf, "PFP Growth Driver running over input" + input);
     job.setJarByClass(PFPGrowth.class);
-
+    
     job.setMapOutputKeyClass(LongWritable.class);
     job.setMapOutputValueClass(TransactionTree.class);
-
+    
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(TopKStringPatterns.class);
-
+    
     FileInputFormat.addInputPath(job, new Path(input));
     Path outPath = new Path(params.get("output") + "/fpgrowth");
     FileOutputFormat.setOutputPath(job, outPath);
-
+    
     FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
     if (dfs.exists(outPath)) {
       dfs.delete(outPath, true);
     }
-
+    
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setMapperClass(ParallelFPGrowthMapper.class);
     job.setCombinerClass(ParallelFPGrowthCombiner.class);
     job.setReducerClass(ParallelFPGrowthReducer.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
+    
     job.waitForCompletion(true);
   }
-
+  
   /**
    * Serializes the fList and returns the string representation of the List
    * 
@@ -415,14 +439,16 @@
    * @return Serialized String representation of List
    * @throws IOException
    */
-  private static String serializeList(List<Pair<String, Long>> list, Configuration conf) throws IOException {
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-        + "org.apache.hadoop.io.serializer.WritableSerialization");
-    DefaultStringifier<List<Pair<String, Long>>> listStringifier = new DefaultStringifier<List<Pair<String, Long>>>(
+  private static String serializeList(List<Pair<String,Long>> list,
+                                      Configuration conf) throws IOException {
+    conf.set("io.serializations",
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
+    DefaultStringifier<List<Pair<String,Long>>> listStringifier = new DefaultStringifier<List<Pair<String,Long>>>(
         conf, GenericsUtil.getClass(list));
     return listStringifier.toString(list);
   }
-
+  
   /**
    * Converts a given Map in to a String using DefaultStringifier of Hadoop
    * 
@@ -431,11 +457,12 @@
    * @return Serialized String representation of the GList Map
    * @throws IOException
    */
-  private static String serializeMap(Map<String, Long> map, Configuration conf) throws IOException {
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-        + "org.apache.hadoop.io.serializer.WritableSerialization");
-    DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(conf,
-        GenericsUtil.getClass(map));
+  private static String serializeMap(Map<String,Long> map, Configuration conf) throws IOException {
+    conf.set("io.serializations",
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
+    DefaultStringifier<Map<String,Long>> mapStringifier = new DefaultStringifier<Map<String,Long>>(
+        conf, GenericsUtil.getClass(map));
     return mapStringifier.toString(map);
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java Fri Feb  5 11:00:25 2010
@@ -27,34 +27,40 @@
 
 /**
  * 
- * {@link ParallelCountingMapper} maps all items in a particular transaction like the way it is done in Hadoop
- * WordCount example
+ * {@link ParallelCountingMapper} maps all items in a particular transaction
+ * like the way it is done in Hadoop WordCount example
  * 
  */
-public class ParallelCountingMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
-
-  private static final LongWritable one = new LongWritable(1);
-
-  private Pattern splitter = null;
-
+public class ParallelCountingMapper extends
+    Mapper<LongWritable,Text,Text,LongWritable> {
+  
+  private static final LongWritable ONE = new LongWritable(1);
+  
+  private Pattern splitter;
+  
   @Override
-  protected void map(LongWritable offset, Text input, Context context) throws IOException,
-      InterruptedException {
-
+  protected void map(LongWritable offset,
+                     Text input,
+                     Context context) throws IOException,
+                                     InterruptedException {
+    
     String[] items = splitter.split(input.toString());
     for (String item : items) {
       if (item.trim().length() == 0) {
         continue;
       }
       context.setStatus("Parallel Counting Mapper: " + item);
-      context.write(new Text(item), one);
+      context.write(new Text(item), ONE);
     }
   }
-
+  
   @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
+  protected void setup(Context context) throws IOException,
+                                       InterruptedException {
     super.setup(context);
-    Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
-    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER.toString()));
+    Parameters params = Parameters.fromString(context.getConfiguration().get(
+      "pfp.parameters", ""));
+    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER
+        .toString()));
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java Fri Feb  5 11:00:25 2010
@@ -24,14 +24,18 @@
 import org.apache.hadoop.mapreduce.Reducer;
 
 /**
- * {@link ParallelCountingReducer} sums up the item count and output the item and the count This can also be
- * used as a local Combiner. A simple summing reducer
+ * {@link ParallelCountingReducer} sums up the item count and output the item
+ * and the count This can also be used as a local Combiner. A simple summing
+ * reducer
  */
-public class ParallelCountingReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
-
+public class ParallelCountingReducer extends
+    Reducer<Text,LongWritable,Text,LongWritable> {
+  
   @Override
-  protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
-      InterruptedException {
+  protected void reduce(Text key,
+                        Iterable<LongWritable> values,
+                        Context context) throws IOException,
+                                        InterruptedException {
     long sum = 0;
     for (LongWritable value : values) {
       context.setStatus("Parallel Counting Reducer :" + key);
@@ -39,6 +43,6 @@
     }
     context.setStatus("Parallel Counting Reducer: " + key + " => " + sum);
     context.write(key, new LongWritable(sum));
-
+    
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java Fri Feb  5 11:00:25 2010
@@ -31,25 +31,27 @@
  */
 
 public class ParallelFPGrowthCombiner extends
-    Reducer<LongWritable, TransactionTree, LongWritable, TransactionTree> {
-
+    Reducer<LongWritable,TransactionTree,LongWritable,TransactionTree> {
+  
   @Override
-  protected void reduce(LongWritable key, Iterable<TransactionTree> values,
-      Context context) throws IOException, InterruptedException {
+  protected void reduce(LongWritable key,
+                        Iterable<TransactionTree> values,
+                        Context context) throws IOException,
+                                        InterruptedException {
     TransactionTree cTree = new TransactionTree();
     int count = 0;
     int node = 0;
     for (TransactionTree tr : values) {
-      Iterator<Pair<List<Integer>, Long>> it = tr.getIterator();
+      Iterator<Pair<List<Integer>,Long>> it = tr.getIterator();
       while (it.hasNext()) {
-        Pair<List<Integer>, Long> p = it.next();
+        Pair<List<Integer>,Long> p = it.next();
         node += cTree.addPattern(p.getFirst(), p.getSecond());
         count++;
       }
     }
-
+    
     context.write(key, cTree.getCompressedTree());
-
+    
   }
-
+  
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java Fri Feb  5 11:00:25 2010
@@ -32,58 +32,68 @@
 import org.apache.mahout.common.Parameters;
 
 /**
- * {@link ParallelFPGrowthMapper} maps each transaction to all unique items groups in the transaction. mapper
- * outputs the group id as key and the transaction as value
+ * {@link ParallelFPGrowthMapper} maps each transaction to all unique items
+ * groups in the transaction. mapper outputs the group id as key and the
+ * transaction as value
  * 
  */
 public class ParallelFPGrowthMapper extends
-    Mapper<LongWritable, TransactionTree, LongWritable, TransactionTree> {
-
-  private final Map<Integer, Long> gListInt = new HashMap<Integer, Long>();
-
+    Mapper<LongWritable,TransactionTree,LongWritable,TransactionTree> {
+  
+  private final Map<Integer,Long> gListInt = new HashMap<Integer,Long>();
+  
   @Override
-  protected void map(LongWritable offset, TransactionTree input, Context context) throws IOException,
-      InterruptedException {
-
-    Iterator<Pair<List<Integer>, Long>> it = input.getIterator();
+  protected void map(LongWritable offset,
+                     TransactionTree input,
+                     Context context) throws IOException,
+                                     InterruptedException {
+    
+    Iterator<Pair<List<Integer>,Long>> it = input.getIterator();
     while (it.hasNext()) {
-      Pair<List<Integer>, Long> pattern = it.next();
-      Integer[] prunedItems = pattern.getFirst().toArray(new Integer[pattern.getFirst().size()]);
-
+      Pair<List<Integer>,Long> pattern = it.next();
+      Integer[] prunedItems = pattern.getFirst().toArray(
+        new Integer[pattern.getFirst().size()]);
+      
       Set<Long> groups = new HashSet<Long>();
       for (int j = prunedItems.length - 1; j >= 0; j--) { // generate group
         // dependent
         // shards
         Integer item = prunedItems[j];
         Long groupID = gListInt.get(item);
-
+        
         if (groups.contains(groupID) == false) {
           Integer[] tempItems = new Integer[j + 1];
           System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
-          context.setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: " + item);
-          context.write(new LongWritable(groupID), new TransactionTree(tempItems, pattern.getSecond()));
+          context.setStatus(
+            "Parallel FPGrowth: Generating Group Dependent transactions for: "
+            + item);
+          context.write(new LongWritable(groupID), new TransactionTree(
+              tempItems, pattern.getSecond()));
         }
         groups.add(groupID);
       }
     }
-
+    
   }
-
+  
   @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
+  protected void setup(Context context) throws IOException,
+                                       InterruptedException {
     super.setup(context);
-    Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
-
-    Map<String, Integer> fMap = new HashMap<String, Integer>();
+    Parameters params = Parameters.fromString(context.getConfiguration().get(
+      "pfp.parameters", ""));
+    
+    Map<String,Integer> fMap = new HashMap<String,Integer>();
     int i = 0;
-    for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
+    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, "fList",
+      context.getConfiguration())) {
       fMap.put(e.getFirst(), i++);
     }
-
-    for (Entry<String, Long> e : PFPGrowth.deserializeMap(params, "gList", context.getConfiguration())
-        .entrySet()) {
+    
+    for (Entry<String,Long> e : PFPGrowth.deserializeMap(params, "gList",
+      context.getConfiguration()).entrySet()) {
       gListInt.put(fMap.get(e.getKey()), e.getValue());
     }
-
+    
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java Fri Feb  5 11:00:25 2010
@@ -36,7 +36,7 @@
 import org.apache.mahout.common.Parameters;
 import org.apache.mahout.fpm.pfpgrowth.convertors.ContextStatusUpdater;
 import org.apache.mahout.fpm.pfpgrowth.convertors.ContextWriteOutputCollector;
-import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerStringOutputConvertor;
+import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerStringOutputConverter;
 import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
 import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
 import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPTreeDepthCache;
@@ -49,93 +49,92 @@
  */
 
 public class ParallelFPGrowthReducer extends
-    Reducer<LongWritable, TransactionTree, Text, TopKStringPatterns> {
-
-  private final List<Pair<Integer, Long>> fList = new ArrayList<Pair<Integer, Long>>();
-
+    Reducer<LongWritable,TransactionTree,Text,TopKStringPatterns> {
+  
+  private final List<Pair<Integer,Long>> fList = new ArrayList<Pair<Integer,Long>>();
+  
   private final List<String> featureReverseMap = new ArrayList<String>();
-
-  private final Map<String, Integer> fMap = new HashMap<String, Integer>();
-
+  
+  private final Map<String,Integer> fMap = new HashMap<String,Integer>();
+  
   private final List<String> fRMap = new ArrayList<String>();
-
-  private final Map<Long, List<Integer>> groupFeatures = new HashMap<Long, List<Integer>>();
-
+  
+  private final Map<Long,List<Integer>> groupFeatures = new HashMap<Long,List<Integer>>();
+  
   private int maxHeapSize = 50;
-
+  
   private int minSupport = 3;
-
+  
   @Override
-  protected void reduce(LongWritable key, Iterable<TransactionTree> values,
-      Context context) throws IOException {
+  protected void reduce(LongWritable key,
+                        Iterable<TransactionTree> values,
+                        Context context) throws IOException {
     TransactionTree cTree = new TransactionTree();
     int nodes = 0;
     for (TransactionTree tr : values) {
-      Iterator<Pair<List<Integer>, Long>> it = tr.getIterator();
+      Iterator<Pair<List<Integer>,Long>> it = tr.getIterator();
       while (it.hasNext()) {
-        Pair<List<Integer>, Long> p = it.next();
+        Pair<List<Integer>,Long> p = it.next();
         nodes += cTree.addPattern(p.getFirst(), p.getSecond());
       }
     }
-   
-    List<Pair<Integer, Long>> localFList = new ArrayList<Pair<Integer, Long>>();
-    for (Entry<Integer, MutableLong> fItem : cTree.generateFList().entrySet()) {
-      localFList.add(new Pair<Integer, Long>(fItem.getKey(), fItem.getValue()
+    
+    List<Pair<Integer,Long>> localFList = new ArrayList<Pair<Integer,Long>>();
+    for (Entry<Integer,MutableLong> fItem : cTree.generateFList().entrySet()) {
+      localFList.add(new Pair<Integer,Long>(fItem.getKey(), fItem.getValue()
           .toLong()));
-     
+      
     }
-
-    Collections.sort(localFList, new Comparator<Pair<Integer, Long>>() {
-
+    
+    Collections.sort(localFList, new Comparator<Pair<Integer,Long>>() {
+      
       @Override
-      public int compare(Pair<Integer, Long> o1, Pair<Integer, Long> o2) {
+      public int compare(Pair<Integer,Long> o1, Pair<Integer,Long> o2) {
         int ret = o2.getSecond().compareTo(o1.getSecond());
         if (ret != 0) {
           return ret;
         }
         return o1.getFirst().compareTo(o2.getFirst());
       }
-
+      
     });
     
-
     FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>();
-    fpGrowth
-        .generateTopKFrequentPatterns(
-            cTree.getIterator(),
-            localFList,
-            minSupport,
-            maxHeapSize,
-            new HashSet<Integer>(groupFeatures.get(key.get())),
-            new IntegerStringOutputConvertor(
-                new ContextWriteOutputCollector<LongWritable, TransactionTree, Text, TopKStringPatterns>(
-                    context), featureReverseMap),
-            new ContextStatusUpdater<LongWritable, TransactionTree, Text, TopKStringPatterns>(
-                context));
+    fpGrowth.generateTopKFrequentPatterns(
+        cTree.getIterator(),
+        localFList,
+        minSupport,
+        maxHeapSize,
+        new HashSet<Integer>(groupFeatures.get(key.get())),
+        new IntegerStringOutputConverter(
+            new ContextWriteOutputCollector<LongWritable,TransactionTree,Text,TopKStringPatterns>(
+                context), featureReverseMap),
+        new ContextStatusUpdater<LongWritable,TransactionTree,Text,TopKStringPatterns>(
+            context));
   }
-
+  
   @Override
   protected void setup(Context context) throws IOException,
-      InterruptedException {
-
+                                       InterruptedException {
+    
     super.setup(context);
     Parameters params = Parameters.fromString(context.getConfiguration().get(
-        "pfp.parameters", ""));
-
+      "pfp.parameters", ""));
+    
     int i = 0;
-    for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList",
-        context.getConfiguration())) {
+    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, "fList",
+      context.getConfiguration())) {
       featureReverseMap.add(e.getFirst());
       fMap.put(e.getFirst(), i);
       fRMap.add(e.getFirst());
-      fList.add(new Pair<Integer, Long>(i++, e.getSecond()));
-
+      fList.add(new Pair<Integer,Long>(i++, e.getSecond()));
+      
     }
-
-    Map<String, Long> gList = PFPGrowth.deserializeMap(params, "gList", context
+    
+    Map<String,Long> gList = PFPGrowth.deserializeMap(params, "gList", context
         .getConfiguration());
-
-    for (Entry<String, Long> entry : gList.entrySet()) {
+    
+    for (Entry<String,Long> entry : gList.entrySet()) {
       List<Integer> groupList = groupFeatures.get(entry.getValue());
       Integer itemInteger = fMap.get(entry.getKey());
       if (groupList != null) {
@@ -145,12 +144,12 @@
         groupList.add(itemInteger);
         groupFeatures.put(entry.getValue(), groupList);
       }
-
+      
     }
     maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50"));
     minSupport = Integer.valueOf(params.get("minSupport", "3"));
-    FPTreeDepthCache.setFirstLevelCacheSize(Integer.valueOf(params
-        .get("treeCacheSize", Integer
-            .toString(FPTreeDepthCache.getFirstLevelCacheSize()))));
+    FPTreeDepthCache.setFirstLevelCacheSize(Integer.valueOf(params.get(
+      "treeCacheSize", Integer.toString(FPTreeDepthCache
+          .getFirstLevelCacheSize()))));
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java Fri Feb  5 11:00:25 2010
@@ -35,51 +35,58 @@
 import org.apache.mahout.common.Parameters;
 
 /**
- * {@link TransactionSortingMapper} maps each transaction to all unique items groups in the transaction.
- * mapper outputs the group id as key and the transaction as value
+ * {@link TransactionSortingMapper} maps each transaction to all unique items
+ * groups in the transaction. mapper outputs the group id as key and the
+ * transaction as value
  * 
  */
-public class TransactionSortingMapper extends Mapper<LongWritable, Text, LongWritable, TransactionTree> {
-
-  private final Map<String, Integer> fMap = new HashMap<String, Integer>();
-
-  private Pattern splitter = null;
-
+public class TransactionSortingMapper extends
+    Mapper<LongWritable,Text,LongWritable,TransactionTree> {
+  
+  private final Map<String,Integer> fMap = new HashMap<String,Integer>();
+  
+  private Pattern splitter;
+  
   @Override
   protected void map(LongWritable offset, Text input, Context context) throws IOException,
-      InterruptedException {
-
+                                                                      InterruptedException {
+    
     String[] items = splitter.split(input.toString());
     Set<String> uniqueItems = new HashSet<String>(Arrays.asList(items));
-
+    
     List<Integer> itemSet = new ArrayList<Integer>();
     for (String item : uniqueItems) { // remove items not in the fList
       if (fMap.containsKey(item) && item.trim().length() != 0) {
         itemSet.add(fMap.get(item));
       }
     }
-
+    
     Collections.sort(itemSet);
-
+    
     Integer[] prunedItems = itemSet.toArray(new Integer[itemSet.size()]);
-
+    
     if (prunedItems.length > 0) {
-      context.write(new LongWritable(prunedItems[0]), new TransactionTree(prunedItems, 1L));
+      context.write(new LongWritable(prunedItems[0]), new TransactionTree(
+          prunedItems, 1L));
     }
-
+    
   }
-
+  
   @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
+  protected void setup(Context context) throws IOException,
+                                       InterruptedException {
     super.setup(context);
-    Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
-
+    Parameters params = Parameters.fromString(context.getConfiguration().get(
+      "pfp.parameters", ""));
+    
     int i = 0;
-    for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
+    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, "fList",
+      context.getConfiguration())) {
       fMap.put(e.getFirst(), i++);
     }
-
-    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER.toString()));
-
+    
+    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER
+        .toString()));
+    
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java Fri Feb  5 11:00:25 2010
@@ -21,6 +21,7 @@
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.Reducer;
+
 /**
  * {@link TransactionSortingReducer} takes each group of transactions and runs
  * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns for each
@@ -29,14 +30,17 @@
  */
 
 public class TransactionSortingReducer extends
-    Reducer<LongWritable, TransactionTree, LongWritable, TransactionTree> {
-
-  private static final LongWritable one = new LongWritable(1);
+    Reducer<LongWritable,TransactionTree,LongWritable,TransactionTree> {
+  
+  private static final LongWritable ONE = new LongWritable(1);
+  
   @Override
-  protected void reduce(LongWritable key, Iterable<TransactionTree> values,
-      Context context) throws IOException, InterruptedException {
+  protected void reduce(LongWritable key,
+                        Iterable<TransactionTree> values,
+                        Context context) throws IOException,
+                                        InterruptedException {
     for (TransactionTree tr : values) {
-      context.write(one, tr);
+      context.write(ONE, tr);
     }
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java Fri Feb  5 11:00:25 2010
@@ -38,24 +38,34 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * A compact representation of transactions modeled on the lines to
+ * {@link FPTree} This reduces plenty of space and speeds up Map/Reduce of
+ * {@link PFPGrowth} algorithm by reducing data size passed from the Mapper to
+ * the reducer where {@link FPGrowth} mining is done
+ */
 public final class TransactionTree implements Writable {
-
-  public final class TransactionTreeIterator implements Iterator<Pair<List<Integer>, Long>> {
-
+  /**
+   * Generates a List of transactions view of Transaction Tree by doing Depth
+   * First Traversal on the tree structure
+   */
+  public final class TransactionTreeIterator implements
+      Iterator<Pair<List<Integer>,Long>> {
+    
     private final Stack<int[]> depth = new Stack<int[]>();
-
+    
     public TransactionTreeIterator() {
       depth.push(new int[] {0, -1});
     }
-
+    
     @Override
     public boolean hasNext() {
       return !depth.isEmpty();
     }
-
+    
     @Override
-    public Pair<List<Integer>, Long> next() {
-
+    public Pair<List<Integer>,Long> next() {
+      
       long sum;
       int childId;
       do {
@@ -70,22 +80,23 @@
         top[1]++;
         childId = nodeChildren[top[0]][top[1]];
         depth.push(new int[] {childId, -1});
-
+        
         sum = 0;
         for (int i = childCount[childId] - 1; i >= 0; i--) {
           sum += nodeCount[nodeChildren[childId][i]];
         }
       } while (sum == nodeCount[childId]);
-
+      
       List<Integer> data = new ArrayList<Integer>();
       Iterator<int[]> it = depth.iterator();
       it.next();
       while (it.hasNext()) {
         data.add(attribute[it.next()[0]]);
       }
-
-      Pair<List<Integer>, Long> returnable = new Pair<List<Integer>, Long>(data, nodeCount[childId] - sum);
-
+      
+      Pair<List<Integer>,Long> returnable = new Pair<List<Integer>,Long>(data,
+          nodeCount[childId] - sum);
+      
       int[] top = depth.peek();
       while (top[1] + 1 == childCount[top[0]]) {
         depth.pop();
@@ -96,43 +107,44 @@
       }
       return returnable;
     }
-
+    
     @Override
     public void remove() {
       throw new UnsupportedOperationException();
     }
-
+    
   }
-
+  
   private static final int DEFAULT_CHILDREN_INITIAL_SIZE = 2;
-
+  
   private static final int DEFAULT_INITIAL_SIZE = 8;
   
   private static final float GROWTH_RATE = 1.5f;
-
-  private static final Logger log = LoggerFactory.getLogger(TransactionTree.class);
-
+  
+  private static final Logger log = LoggerFactory
+      .getLogger(TransactionTree.class);
+  
   private static final int ROOTNODEID = 0;
-
+  
   private int[] attribute;
-
+  
   private int[] childCount;
-
+  
   private int[][] nodeChildren;
-
+  
   private long[] nodeCount;
-
-  private int nodes = 0;
-
-  private boolean representedAsList = false;
-
-  private List<Pair<List<Integer>, Long>> transactionSet = new ArrayList<Pair<List<Integer>, Long>>();
-
+  
+  private int nodes;
+  
+  private boolean representedAsList;
+  
+  private List<Pair<List<Integer>,Long>> transactionSet = new ArrayList<Pair<List<Integer>,Long>>();
+  
   public TransactionTree() {
     this(DEFAULT_INITIAL_SIZE);
     representedAsList = false;
   }
-
+  
   public TransactionTree(int size) {
     if (size < DEFAULT_INITIAL_SIZE) {
       size = DEFAULT_INITIAL_SIZE;
@@ -144,17 +156,18 @@
     createRootNode();
     representedAsList = false;
   }
-
+  
   public TransactionTree(Integer[] items, Long support) {
     representedAsList = true;
-    transactionSet.add(new Pair<List<Integer>, Long>(Arrays.asList(items), support));
+    transactionSet.add(new Pair<List<Integer>,Long>(Arrays.asList(items),
+        support));
   }
-
-  public TransactionTree(List<Pair<List<Integer>, Long>> transactionSet) {
+  
+  public TransactionTree(List<Pair<List<Integer>,Long>> transactionSet) {
     representedAsList = true;
     this.transactionSet = transactionSet;
   }
-
+  
   public void addChild(int parentNodeId, int childnodeId) {
     int length = childCount[parentNodeId];
     if (length >= nodeChildren[parentNodeId].length) {
@@ -162,9 +175,9 @@
     }
     nodeChildren[parentNodeId][length++] = childnodeId;
     childCount[parentNodeId] = length;
-
+    
   }
-
+  
   public boolean addCount(int nodeId, long nextNodeCount) {
     if (nodeId < nodes) {
       this.nodeCount[nodeId] += nextNodeCount;
@@ -172,13 +185,13 @@
     }
     return false;
   }
-
+  
   public int addPattern(List<Integer> myList, long addCount) {
     int temp = ROOTNODEID;
     int ret = 0;
     boolean addCountMode = true;
     for (int attributeValue : myList) {
-
+      
       int child;
       if (addCountMode) {
         child = childWithAttribute(temp, attributeValue);
@@ -197,18 +210,18 @@
     }
     return ret;
   }
-
+  
   public int attribute(int nodeId) {
     return this.attribute[nodeId];
   }
-
+  
   public int childAtIndex(int nodeId, int index) {
     if (childCount[nodeId] < index) {
       return -1;
     }
     return nodeChildren[nodeId][index];
   }
-
+  
   public int childCount() {
     int sum = 0;
     for (int i = 0; i < nodes; i++) {
@@ -216,11 +229,11 @@
     }
     return sum;
   }
-
+  
   public int childCount(int nodeId) {
     return childCount[nodeId];
   }
-
+  
   public int childWithAttribute(int nodeId, int childAttribute) {
     int length = childCount[nodeId];
     for (int i = 0; i < length; i++) {
@@ -230,18 +243,18 @@
     }
     return -1;
   }
-
+  
   public long count(int nodeId) {
     return nodeCount[nodeId];
   }
-
-  public Map<Integer, MutableLong> generateFList() {
-    Map<Integer, MutableLong> frequencyList = new HashMap<Integer, MutableLong>();
-    Iterator<Pair<List<Integer>, Long>> it = getIterator();
+  
+  public Map<Integer,MutableLong> generateFList() {
+    Map<Integer,MutableLong> frequencyList = new HashMap<Integer,MutableLong>();
+    Iterator<Pair<List<Integer>,Long>> it = getIterator();
     int items = 0;
     int count = 0;
     while (it.hasNext()) {
-      Pair<List<Integer>, Long> p = it.next();
+      Pair<List<Integer>,Long> p = it.next();
       items += p.getFirst().size();
       count++;
       for (Integer i : p.getFirst()) {
@@ -253,36 +266,37 @@
     }
     return frequencyList;
   }
-
+  
   public TransactionTree getCompressedTree() {
     TransactionTree ctree = new TransactionTree();
-    Iterator<Pair<List<Integer>, Long>> it = getIterator();
-    final Map<Integer, MutableLong> fList = generateFList();
+    Iterator<Pair<List<Integer>,Long>> it = getIterator();
+    final Map<Integer,MutableLong> fList = generateFList();
     int node = 0;
     Comparator<Integer> comparator = new Comparator<Integer>() {
-
+      
       @Override
       public int compare(Integer o1, Integer o2) {
         return fList.get(o2).compareTo(fList.get(o1));
       }
-
+      
     };
     int size = 0;
-    List<Pair<List<Integer>, Long>> compressedTransactionSet = new ArrayList<Pair<List<Integer>, Long>>();
+    List<Pair<List<Integer>,Long>> compressedTransactionSet = new ArrayList<Pair<List<Integer>,Long>>();
     while (it.hasNext()) {
-      Pair<List<Integer>, Long> p = it.next();
+      Pair<List<Integer>,Long> p = it.next();
       Collections.sort(p.getFirst(), comparator);
       compressedTransactionSet.add(p);
       node += ctree.addPattern(p.getFirst(), p.getSecond());
       size += p.getFirst().size() + 2;
     }
-
+    
     log.debug("Nodes in UnCompressed Tree: {} ", nodes);
-    log
-        .debug("UnCompressed Tree Size: {}", (this.nodes * 4 * 4 + this.childCount() * 4)
-            / (double) 1000000);
+    log.debug("UnCompressed Tree Size: {}", (this.nodes * 4 * 4 + this
+        .childCount() * 4)
+                                            / (double) 1000000);
     log.debug("Nodes in Compressed Tree: {} ", node);
-    log.debug("Compressed Tree Size: {}", (node * 4 * 4 + ctree.childCount() * 4) / (double) 1000000);
+    log.debug("Compressed Tree Size: {}",
+      (node * 4 * 4 + ctree.childCount() * 4) / (double) 1000000);
     log.debug("TransactionSet Size: {}", (size * 4) / (double) 1000000);
     if (node * 4 * 4 + ctree.childCount() * 4 <= size * 4) {
       return ctree;
@@ -291,45 +305,47 @@
       return ctree;
     }
   }
-
-  public Iterator<Pair<List<Integer>, Long>> getIterator() {
+  
+  public Iterator<Pair<List<Integer>,Long>> getIterator() {
     if (this.isTreeEmpty() && !representedAsList) {
-      throw new IllegalStateException("This is a bug. Please report this to mahout-user list");
+      throw new IllegalStateException(
+          "This is a bug. Please report this to mahout-user list");
     } else if (representedAsList) {
       return transactionSet.iterator();
     } else {
       return new TransactionTreeIterator();
     }
   }
-
+  
   public boolean isTreeEmpty() {
     return nodes <= 1;
   }
-
+  
   @Override
   public void readFields(DataInput in) throws IOException {
     representedAsList = in.readBoolean();
-
+    
     VIntWritable vInt = new VIntWritable();
     VLongWritable vLong = new VLongWritable();
-
+    
     if (representedAsList) {
-      transactionSet = new ArrayList<Pair<List<Integer>, Long>>();
+      transactionSet = new ArrayList<Pair<List<Integer>,Long>>();
       vInt.readFields(in);
       int numTransactions = vInt.get();
       for (int i = 0; i < numTransactions; i++) {
         vLong.readFields(in);
         Long support = vLong.get();
-
+        
         vInt.readFields(in);
         int length = vInt.get();
-
+        
         Integer[] items = new Integer[length];
         for (int j = 0; j < length; j++) {
           vInt.readFields(in);
           items[j] = vInt.get();
         }
-        Pair<List<Integer>, Long> transaction = new Pair<List<Integer>, Long>(Arrays.asList(items), support);
+        Pair<List<Integer>,Long> transaction = new Pair<List<Integer>,Long>(
+            Arrays.asList(items), support);
         transactionSet.add(transaction);
       }
     } else {
@@ -354,7 +370,7 @@
       }
     }
   }
-
+  
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeBoolean(representedAsList);
@@ -364,14 +380,14 @@
       vInt.set(transactionSet.size());
       vInt.write(out);
       for (int i = 0, j = transactionSet.size(); i < j; i++) {
-        Pair<List<Integer>, Long> transaction = transactionSet.get(i);
-
+        Pair<List<Integer>,Long> transaction = transactionSet.get(i);
+        
         vLong.set(transaction.getSecond());
         vLong.write(out);
-
+        
         vInt.set(transaction.getFirst().size());
         vInt.write(out);
-
+        
         for (Integer item : transaction.getFirst()) {
           vInt.set(item);
           vInt.write(out);
@@ -394,24 +410,24 @@
       }
     }
   }
-
+  
   private int createNode(int parentNodeId, int attributeValue, long count) {
     if (nodes >= this.attribute.length) {
       resize();
     }
-
+    
     childCount[nodes] = 0;
     this.attribute[nodes] = attributeValue;
     nodeCount[nodes] = count;
     if (nodeChildren[nodes] == null) {
       nodeChildren[nodes] = new int[DEFAULT_CHILDREN_INITIAL_SIZE];
     }
-
+    
     int childNodeId = nodes++;
     addChild(parentNodeId, childNodeId);
     return childNodeId;
   }
-
+  
   private int createRootNode() {
     childCount[nodes] = 0;
     attribute[nodes] = -1;
@@ -421,29 +437,29 @@
     }
     return nodes++;
   }
-
+  
   private void resize() {
     int size = (int) (GROWTH_RATE * nodes);
     if (size < DEFAULT_INITIAL_SIZE) {
       size = DEFAULT_INITIAL_SIZE;
     }
-
+    
     int[] oldChildCount = childCount;
     int[] oldAttribute = attribute;
     long[] oldnodeCount = nodeCount;
     int[][] oldNodeChildren = nodeChildren;
-
+    
     childCount = new int[size];
     attribute = new int[size];
     nodeCount = new long[size];
     nodeChildren = new int[size][];
-
+    
     System.arraycopy(oldChildCount, 0, this.childCount, 0, nodes);
     System.arraycopy(oldAttribute, 0, this.attribute, 0, nodes);
     System.arraycopy(oldnodeCount, 0, this.nodeCount, 0, nodes);
     System.arraycopy(oldNodeChildren, 0, this.nodeChildren, 0, nodes);
   }
-
+  
   private void resizeChildren(int nodeId) {
     int length = childCount[nodeId];
     int size = (int) (GROWTH_RATE * length);

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java Fri Feb  5 11:00:25 2010
@@ -19,20 +19,27 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
-
-public class ContextStatusUpdater<IK extends Writable, IV extends Writable,
-    K extends Writable, V extends Writable> implements StatusUpdater {
-
+/**
+ * Updates the Context object of a {@link Reducer} class
+ *
+ * @param <IK>
+ * @param <IV>
+ * @param <K>
+ * @param <V>
+ */
+public class ContextStatusUpdater<IK extends Writable,IV extends Writable,
+    K extends Writable,V extends Writable> implements StatusUpdater {
+  
   private static final long PERIOD = 10000; // Update every 10 seconds
-
-  private final Reducer<IK, IV, K, V>.Context context;
-
+  
+  private final Reducer<IK,IV,K,V>.Context context;
+  
   private long time = System.currentTimeMillis();
-
-  public ContextStatusUpdater(Reducer<IK, IV, K, V>.Context context) {
+  
+  public ContextStatusUpdater(Reducer<IK,IV,K,V>.Context context) {
     this.context = context;
   }
-
+  
   @Override
   public void update(String status) {
     long curTime = System.currentTimeMillis();
@@ -40,7 +47,7 @@
       time = curTime;
       context.setStatus("Processing FPTree: " + status);
     }
-
+    
   }
-
+  
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java Fri Feb  5 11:00:25 2010
@@ -25,18 +25,27 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ContextWriteOutputCollector<IK extends Writable, IV extends Writable,
-    K extends Writable, V extends Writable> implements OutputCollector<K, V> {
-
+/**
+ * An output collecter for {@link Reducer} for PFPGrowth which updates the
+ * status as well as writes the patterns generated by the algorithm
+ * 
+ * @param <IK>
+ * @param <IV>
+ * @param <K>
+ * @param <V>
+ */
+public class ContextWriteOutputCollector<IK extends Writable,IV extends Writable,K extends Writable,V extends Writable>
+    implements OutputCollector<K,V> {
+  
   private static final Logger log = LoggerFactory
       .getLogger(ContextWriteOutputCollector.class);
-
-  private final Reducer<IK, IV, K, V>.Context context;
-
-  public ContextWriteOutputCollector(Reducer<IK, IV, K, V>.Context context) {
+  
+  private final Reducer<IK,IV,K,V>.Context context;
+  
+  public ContextWriteOutputCollector(Reducer<IK,IV,K,V>.Context context) {
     this.context = context;
   }
-
+  
   @Override
   public final void collect(K key, V value) throws IOException {
     try {
@@ -46,5 +55,5 @@
       log.error("{}", e);
     }
   }
-
+  
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java Fri Feb  5 11:00:25 2010
@@ -23,17 +23,24 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
 
-public class SequenceFileOutputCollector<K extends Writable, V extends Writable>
-    implements OutputCollector<K, V> {
+/**
+ * Collects the {@link Writable} key and {@link Writable} value, and writes them
+ * into a {@link SequenceFile}
+ * 
+ * @param <K>
+ * @param <V>
+ */
+public class SequenceFileOutputCollector<K extends Writable,V extends Writable>
+    implements OutputCollector<K,V> {
   private final SequenceFile.Writer writer;
-
+  
   public SequenceFileOutputCollector(SequenceFile.Writer writer) {
     this.writer = writer;
   }
-
+  
   @Override
   public final void collect(K key, V value) throws IOException {
     writer.append(key, value);
   }
-
+  
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java Fri Feb  5 11:00:25 2010
@@ -17,7 +17,11 @@
 
 package org.apache.mahout.fpm.pfpgrowth.convertors;
 
+/**
+ * An interface of a Status updater
+ * 
+ */
 public interface StatusUpdater {
-
+  
   void update(String status);
 }

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConverter.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConverter.java?rev=906896&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConverter.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConverter.java Fri Feb  5 11:00:25 2010
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FrequentPatternMaxHeap;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.Pattern;
+
+/**
+ * An output converter which converts the output patterns and collectes them in
+ * a {@link FrequentPatternMaxHeap}
+ * 
+ * @param <A>
+ */
+public final class TopKPatternsOutputConverter<A extends Comparable<? super A>>
+    implements OutputCollector<Integer,FrequentPatternMaxHeap> {
+  
+  private OutputCollector<A,List<Pair<List<A>,Long>>> collector = null;
+  
+  private Map<Integer,A> reverseMapping;
+  
+  public TopKPatternsOutputConverter(OutputCollector<A,List<Pair<List<A>,Long>>> collector,
+                                     Map<Integer,A> reverseMapping) {
+    this.collector = collector;
+    this.reverseMapping = reverseMapping;
+  }
+  
+  @Override
+  public void collect(Integer key, FrequentPatternMaxHeap value) throws IOException {
+    List<Pair<List<A>,Long>> perAttributePatterns = new ArrayList<Pair<List<A>,Long>>();
+    PriorityQueue<Pattern> t = value.getHeap();
+    while (!t.isEmpty()) {
+      Pattern itemSet = t.poll();
+      List<A> frequentPattern = new ArrayList<A>();
+      for (int j = 0; j < itemSet.length(); j++) {
+        frequentPattern.add(reverseMapping.get(itemSet.getPattern()[j]));
+      }
+      Collections.sort(frequentPattern);
+      
+      Pair<List<A>,Long> returnItemSet = new Pair<List<A>,Long>(
+          frequentPattern, itemSet.support());
+      perAttributePatterns.add(returnItemSet);
+    }
+    Collections.reverse(perAttributePatterns);
+    
+    collector.collect(reverseMapping.get(key), perAttributePatterns);
+  }
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java Fri Feb  5 11:00:25 2010
@@ -23,45 +23,51 @@
 
 import org.apache.mahout.common.Pair;
 
-public class TransactionIterator<AP> implements Iterator<Pair<int[], Long>> {
-  private Map<AP, Integer> attributeIdMapping = null;
-
-  private Iterator<Pair<List<AP>, Long>> iterator = null;
-
-  private int[] transactionBuffer = null;
-
-  public TransactionIterator(Iterator<Pair<List<AP>, Long>> iterator,
-      Map<AP, Integer> attributeIdMapping) {
+/**
+ * Iterates over a Transaction and outputs the transaction integer id mapping
+ * and the support of the transaction
+ * 
+ * @param <AP>
+ */
+public class TransactionIterator<AP> implements Iterator<Pair<int[],Long>> {
+  private Map<AP,Integer> attributeIdMapping;
+  
+  private Iterator<Pair<List<AP>,Long>> iterator;
+  
+  private int[] transactionBuffer;
+  
+  public TransactionIterator(Iterator<Pair<List<AP>,Long>> iterator,
+                             Map<AP,Integer> attributeIdMapping) {
     this.attributeIdMapping = attributeIdMapping;
     this.iterator = iterator;
     transactionBuffer = new int[attributeIdMapping.size()];
   }
-
+  
   @Override
   public final boolean hasNext() {
     return iterator.hasNext();
   }
-
+  
   @Override
-  public final Pair<int[], Long> next() {
-    Pair<List<AP>, Long> transaction = iterator.next();
+  public final Pair<int[],Long> next() {
+    Pair<List<AP>,Long> transaction = iterator.next();
     int index = 0;
-
+    
     for (AP attribute : transaction.getFirst()) {
       if (attributeIdMapping.containsKey(attribute)) {
         transactionBuffer[index++] = attributeIdMapping.get(attribute);
       }
     }
-
+    
     int[] transactionList = new int[index];
     System.arraycopy(transactionBuffer, 0, transactionList, 0, index);
-    return new Pair<int[], Long>(transactionList, transaction.getSecond());
-
+    return new Pair<int[],Long>(transactionList, transaction.getSecond());
+    
   }
-
+  
   @Override
   public final void remove() {
     iterator.remove();
   }
-
+  
 }

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConverter.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConverter.java?rev=906896&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConverter.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConverter.java Fri Feb  5 11:00:25 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.integer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+
+/**
+ * Collects the Patterns with Integer id and Long support and converts them to
+ * Pattern of Strings based on a reverse feature lookup map.
+ */
+public final class IntegerStringOutputConverter implements
+    OutputCollector<Integer,List<Pair<List<Integer>,Long>>> {
+  
+  private OutputCollector<Text,TopKStringPatterns> collector;
+  
+  private List<String> featureReverseMap;
+  
+  public IntegerStringOutputConverter(OutputCollector<Text,TopKStringPatterns> collector,
+                                      List<String> featureReverseMap) {
+    this.collector = collector;
+    this.featureReverseMap = featureReverseMap;
+  }
+  
+  @Override
+  public void collect(Integer key, List<Pair<List<Integer>,Long>> value) throws IOException {
+    String stringKey = featureReverseMap.get(key);
+    List<Pair<List<String>,Long>> stringValues = new ArrayList<Pair<List<String>,Long>>();
+    for (Pair<List<Integer>,Long> e : value) {
+      List<String> pattern = new ArrayList<String>();
+      for (Integer i : e.getFirst()) {
+        pattern.add(featureReverseMap.get(i));
+      }
+      stringValues.add(new Pair<List<String>,Long>(pattern, e.getSecond()));
+    }
+    
+    collector
+        .collect(new Text(stringKey), new TopKStringPatterns(stringValues));
+  }
+  
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java Fri Feb  5 11:00:25 2010
@@ -21,29 +21,31 @@
 import java.util.List;
 
 import org.apache.mahout.common.IntegerTuple;
-
+/**
+ * Iterates over an integer tuple and output it as a list
+ *
+ */
 public final class IntegerTupleIterator implements Iterator<List<Integer>> {
-
-  private Iterator<IntegerTuple> iterator = null;
+  
+  private Iterator<IntegerTuple> iterator;
   
   public IntegerTupleIterator(Iterator<IntegerTuple> iterator) {
     this.iterator = iterator;
   }
-
+  
   @Override
   public boolean hasNext() {
     return iterator.hasNext();
   }
-
+  
   @Override
   public List<Integer> next() {
     IntegerTuple transaction = iterator.next();
     return transaction.getEntries();
   }
-
+  
   @Override
   public void remove() {
     iterator.remove();
   }
-
 }

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConverter.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConverter.java?rev=906896&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConverter.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConverter.java Fri Feb  5 11:00:25 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.string;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.Pair;
+
+/**
+ * Collects a string pattern in a MaxHeap and outputs the top K patterns
+ * 
+ */
+public final class StringOutputConverter implements
+    OutputCollector<String,List<Pair<List<String>,Long>>> {
+  
+  private OutputCollector<Text,TopKStringPatterns> collector;
+  
+  public StringOutputConverter(OutputCollector<Text,TopKStringPatterns> collector) {
+    this.collector = collector;
+  }
+  
+  @Override
+  public void collect(String key,
+                      List<Pair<List<String>,Long>> value) throws IOException {
+    collector.collect(new Text(key), new TopKStringPatterns(value));
+  }
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java?rev=906896&r1=906895&r2=906896&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java Fri Feb  5 11:00:25 2010
@@ -22,9 +22,13 @@
 
 import org.apache.mahout.common.StringTuple;
 
+/**
+ * Iterate over the StringTuple as an iterator of <code>List&lt;String&gt;</code>
+ *
+ */
 public final class StringTupleIterator implements Iterator<List<String>> {
 
-  private Iterator<StringTuple> iterator = null;
+  private Iterator<StringTuple> iterator;
 
   public StringTupleIterator(Iterator<StringTuple> iterator) {
     this.iterator = iterator;



Mime
View raw message