mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From robina...@apache.org
Subject svn commit: r1231700 [2/3] - in /mahout/trunk/core/src: main/java/org/apache/mahout/fpm/pfpgrowth/ main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/ main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth2/ test/java/org/apache/mahout/fpm/pfpgrowth/ test/...
Date Sun, 15 Jan 2012 16:12:15 GMT
Added: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth2/FPTree.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth2/FPTree.java?rev=1231700&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth2/FPTree.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth2/FPTree.java Sun Jan 15 16:12:14 2012
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.fpgrowth2;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.math.list.IntArrayList;
+import org.apache.mahout.math.list.LongArrayList;
+import org.apache.mahout.math.map.OpenIntObjectHashMap;
+
+/**
+ * A straightforward implementation of FPTrees as described in Han et. al.
+ */
+public class FPTree {
+  private static final Logger log = LoggerFactory.getLogger(FPTree.class);
+
+  private final AttrComparator attrComparator = new AttrComparator();
+  private FPNode root;
+  private long minSupport;
+  private LongArrayList attrCountList;
+  private OpenIntObjectHashMap attrNodeLists; 
+
+  public class FPNode {
+    private FPNode parent;
+    private OpenIntObjectHashMap childMap;
+    private int attribute;
+    private long count;
+
+    private FPNode(FPNode parent, int attribute, long count) {
+      this.parent = parent;
+      this.attribute = attribute;
+      this.count = count;
+      this.childMap = new OpenIntObjectHashMap();
+    }
+
+    private void addChild(FPNode child) {
+      this.childMap.put(child.attribute(), child);
+    }
+
+    public Iterable<FPNode> children() {
+      return childMap.values();
+    }
+
+    public int numChildren() {
+      return childMap.size();
+    }
+
+    public FPNode parent() {
+      return parent;
+    }
+
+    public FPNode child(int attribute) {
+      return (FPNode) childMap.get(attribute);
+    }
+
+    public int attribute() {
+      return attribute;
+    }
+
+    public void accumulate(long incr) {
+      count = count + incr;
+    }
+
+    public long count() {
+      return count;
+    }
+
+  }
+
+  /**
+   * Creates an FPTree using the attribute counts in attrCountList.
+   *
+   * Note that the counts in attrCountList are assumed to be complete;
+   * they are not updated as the tree is modified.
+   */
+  public FPTree(LongArrayList attrCountList, long minSupport) {
+    this.root = new FPNode(null, -1, 0);
+    this.attrCountList = attrCountList;
+    this.attrNodeLists = new OpenIntObjectHashMap();
+    this.minSupport = minSupport;
+  }
+
+  /**
+   * Creates an FPTree using the attribute counts in attrCounts.
+   *
+   * Note that the counts in attrCounts are assumed to be complete;
+   * they are not updated as the tree is modified.
+   */
+  public FPTree(long[] attrCounts, long minSupport) {
+    this.root = new FPNode(null, -1, 0);
+    this.attrCountList = new LongArrayList();
+    for (int i = 0; i < attrCounts.length; i++) 
+      if (attrCounts[i] > 0) {
+        if (attrCountList.size() < (i+1)) {
+          attrCountList.setSize(i+1);
+        }
+        attrCountList.set(i, attrCounts[i]);
+      }
+    this.attrNodeLists = new OpenIntObjectHashMap();
+    this.minSupport = minSupport;
+  }
+
+
+  /**
+   * Returns the count of the given attribute, as supplied on construction.
+   */
+  public long headerCount(int attribute) {
+    return attrCountList.get(attribute);
+  }
+
+  /**
+   * Returns the root node of the tree.
+   */
+  public FPNode root() {
+    return root;
+  }
+
+  /**
+   * Adds an itemset with the given occurrance count.
+   */
+  public void accumulate(IntArrayList argItems, long count) {
+    // boxed primitive used so we can use custom comparitor in sort
+    List<Integer> items = Lists.newArrayList();
+    for (int i = 0; i < argItems.size(); i++) {
+      items.add(argItems.get(i));
+    }
+    Collections.sort(items, attrComparator);
+    
+    FPNode currNode = root;
+    for (int i = 0; i < items.size(); i++) {
+      int item = items.get(i);
+      long attrCount = 0;
+      if (item < attrCountList.size())
+        attrCount = attrCountList.get(item);
+      if (attrCount < minSupport)
+        continue;
+
+      FPNode next = currNode.child(item);
+      if (next == null) {
+        next = new FPNode(currNode, item, count);
+        currNode.addChild(next);
+        List<FPNode> nodeList = (List<FPNode>) attrNodeLists.get(item);
+        if (nodeList == null) {
+          nodeList = Lists.newArrayList();
+          attrNodeLists.put(item, nodeList);
+        }
+        nodeList.add(next);
+      } else {
+        next.accumulate(count);
+      }
+      currNode = next;
+    }
+  } 
+
+  /**
+   * Adds an itemset with the given occurrance count.
+   */
+  public void accumulate(List<Integer> argItems, long count) {
+    List<Integer> items = Lists.newArrayList();
+    items.addAll(argItems);
+    Collections.sort(items, attrComparator);
+    
+    FPNode currNode = root;
+    for (int i = 0; i < items.size(); i++) {
+      int item = items.get(i);
+      long attrCount = attrCountList.get(item);
+      if (attrCount < minSupport)
+        continue;
+
+      FPNode next = currNode.child(item);
+      if (next == null) {
+        next = new FPNode(currNode, item, count);
+        currNode.addChild(next);
+        List<FPNode> nodeList = (List<FPNode>) attrNodeLists.get(item);
+        if (nodeList == null) {
+          nodeList = Lists.newArrayList();
+          attrNodeLists.put(item, nodeList);
+        }
+        nodeList.add(next);
+      } else {
+        next.accumulate(count);
+      }
+      currNode = next;
+    }
+  } 
+
+
+  /**
+   * Returns an Iterable over the attributes in the tree, sorted by
+   * frequency (low to high).
+   */
+  public Iterable<Integer> attrIterable() {
+    List<Integer> attrs = Lists.newArrayList();
+    for (int i = 0; i < attrCountList.size(); i++) {
+      if (attrCountList.get(i) > 0)
+        attrs.add(i);
+    }
+    Collections.sort(attrs, attrComparator);
+    return attrs;
+  }
+
+  /**
+   * Returns an Iterable over the attributes in the tree, sorted by
+   * frequency (high to low).
+   */
+  public Iterable<Integer> attrIterableRev() {
+    List<Integer> attrs = Lists.newArrayList();
+    for (int i = 0; i < attrCountList.size(); i++) {
+      if (attrCountList.get(i) > 0)
+        attrs.add(i);
+    }
+    Collections.sort(attrs, Collections.reverseOrder(attrComparator));
+    return attrs;
+  }
+
+  /**
+   * Returns a conditional FP tree based on the targetAttr, containing
+   * only items that are more frequent.
+   */
+  public FPTree createMoreFreqConditionalTree(int targetAttr) {
+    LongArrayList counts = new LongArrayList();
+    List<FPNode> nodeList = (List<FPNode>) attrNodeLists.get(targetAttr);
+
+    for (FPNode currNode : nodeList) {
+      long pathCount = currNode.count();
+      while (currNode != root) {
+        int currAttr = currNode.attribute();
+        if (counts.size() <= currAttr) {
+          counts.setSize(currAttr+1);
+        }
+        long count = counts.get(currAttr);
+        counts.set(currNode.attribute(), count + pathCount);
+        currNode = currNode.parent();
+      }
+    }
+    if (counts.get(targetAttr) != attrCountList.get(targetAttr))
+      throw new IllegalStateException("mismatched counts for targetAttr="
+                                      +targetAttr+", ("+counts.get(targetAttr)
+                                      +" != "+attrCountList.get(targetAttr)+"); "
+                                      +"thisTree="+this+"\n");
+    counts.set(targetAttr, 0L);
+
+    FPTree toRet = new FPTree(counts, minSupport);
+    IntArrayList attrLst = new IntArrayList();
+    for (FPNode currNode : (List<FPNode>) attrNodeLists.get(targetAttr)) {
+      long count = currNode.count();
+      attrLst.clear();
+      while (currNode != root) {
+        if (currNode.count() < count) 
+          throw new IllegalStateException();
+        attrLst.add(currNode.attribute());
+        currNode = currNode.parent();
+      }
+
+      toRet.accumulate(attrLst, count);      
+    }    
+    return toRet;
+  }
+
+  // biggest count or smallest attr number goes first
+  private class AttrComparator implements Comparator<Integer>{
+    public int compare(Integer a, Integer b) {
+
+      long aCnt = 0;
+      if (a < attrCountList.size())
+        aCnt = attrCountList.get(a);
+      long bCnt = 0;
+      if (b < attrCountList.size())
+        bCnt = attrCountList.get(b);
+      if (aCnt == bCnt)
+        return a - b;
+      return (bCnt - aCnt) < 0 ? -1 : 1;
+    }
+  }
+
+  /**
+   *  Return a pair of trees that result from separating a common prefix
+   *  (if one exists) from the lower portion of this tree.
+   */
+  public Pair<FPTree, FPTree> splitSinglePrefix() {
+    if (root.numChildren() != 1) {
+      return new Pair<FPTree, FPTree>(null, this);
+    }
+    LongArrayList pAttrCountList = new LongArrayList();
+    LongArrayList qAttrCountList = attrCountList.copy();
+
+    FPNode currNode = root;
+    while (currNode.numChildren() == 1) {
+      currNode = currNode.children().iterator().next();
+      if (pAttrCountList.size() <= currNode.attribute())
+        pAttrCountList.setSize(currNode.attribute()+1);
+      pAttrCountList.set(currNode.attribute(), currNode.count());
+      qAttrCountList.set(currNode.attribute(), 0);
+    }
+
+    FPTree pTree = new FPTree(pAttrCountList, minSupport);
+    FPTree qTree = new FPTree(qAttrCountList, minSupport);
+    recursivelyAddPrefixPats(pTree, qTree, root, null);
+
+    return new Pair<FPTree, FPTree>(pTree, qTree);
+  }
+
+  private long recursivelyAddPrefixPats(FPTree pTree, FPTree qTree, FPNode node,
+                                        IntArrayList items) {
+    long added = 0;
+    long count = node.count();
+    int attribute = node.attribute();
+    if (items == null) {
+      // at root
+      if (!(node == root))
+        throw new IllegalStateException();
+      items = new IntArrayList();
+    } else {
+      items.add(attribute);
+    }
+    for (FPNode child : node.children()) {
+      added+= recursivelyAddPrefixPats(pTree, qTree, child, items);
+    }
+    if (added < count) {
+      long toAdd = count - added;
+      pTree.accumulate(items, toAdd);
+      qTree.accumulate(items, toAdd);
+      added+= toAdd;
+    }
+    if (!(node == root)) {
+      int lastIdx = items.size() - 1;
+      if (items.get(lastIdx) != attribute) {
+        throw new IllegalStateException();
+      }
+      items.remove(lastIdx);
+    }
+    return added;
+  }
+
+  private void toStringHelper(StringBuilder sb, FPNode currNode, String prefix) {
+    if (currNode.numChildren() == 0) {
+      sb.append(prefix).append("-{attr:").append(currNode.attribute())
+        .append(", cnt:").append(currNode.count()).append("}\n");
+    } else {
+      StringBuilder newPre = new StringBuilder(prefix);
+      newPre.append("-{attr:").append(currNode.attribute())
+        .append(", cnt:").append(currNode.count()).append('}');
+      StringBuilder fakePre = new StringBuilder();
+      while (fakePre.length() < newPre.length()) {
+        fakePre.append(' ');
+      }
+      int i = 0;
+      for (FPNode child : currNode.children()) 
+        toStringHelper(sb, child, (i++ == 0 ? newPre : fakePre).toString() + '-' + i + "->");
+    }
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("[FPTree\n");
+    toStringHelper(sb, root, "  ");
+    sb.append("]");
+    return sb.toString();
+  }
+
+}
\ No newline at end of file

Added: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthRetailDataTest2.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthRetailDataTest2.java?rev=1231700&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthRetailDataTest2.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthRetailDataTest2.java Sun Jan 15 16:12:14 2012
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.common.iterator.StringRecordIterator;
+import org.apache.mahout.fpm.pfpgrowth.convertors.StatusUpdater;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth2.FPGrowthObj;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+
+public final class FPGrowthRetailDataTest2 extends MahoutTestCase {
+
+  @Test
+  public void testSpecificCaseFromRetailDataMinSup500() throws IOException {
+    FPGrowthObj<String> fp = new FPGrowthObj<String>();
+    
+    StringRecordIterator it = new StringRecordIterator(new FileLineIterable(Resources.getResource(
+      "retail.dat").openStream()), "\\s+");
+    int pattern_41_36_39 = 0;
+    while (it.hasNext()) {
+      Pair<List<String>,Long> next = it.next();
+      List<String> items = next.getFirst();
+      if (items.contains("41") && items.contains("36") && items.contains("39")) {
+        pattern_41_36_39++;
+      }
+    }
+    
+    final Map<Set<String>,Long> results = Maps.newHashMap();
+    
+    Set<String> returnableFeatures = new HashSet<String>();
+    returnableFeatures.add("41");
+    returnableFeatures.add("36");
+    returnableFeatures.add("39");
+    
+    fp.generateTopKFrequentPatterns(
+      new StringRecordIterator(new FileLineIterable(Resources.getResource("retail.dat").openStream()), "\\s+"),
+
+      fp.generateFList(new StringRecordIterator(new FileLineIterable(Resources.getResource("retail.dat")
+          .openStream()), "\\s+"), 500), 500, 1000, returnableFeatures,
+      new OutputCollector<String,List<Pair<List<String>,Long>>>() {
+        
+        @Override
+        public void collect(String key, List<Pair<List<String>,Long>> value) {
+          
+          for (Pair<List<String>,Long> v : value) {
+            List<String> l = v.getFirst();
+            results.put(new HashSet<String>(l), v.getSecond());
+          }
+        }
+        
+      }, new StatusUpdater() {
+        
+        @Override
+        public void update(String status) {}
+      });
+    
+    assertEquals(Long.valueOf(pattern_41_36_39), results.get(returnableFeatures));
+    
+  }
+
+  private long bestResults(Map<Set<String>, Long> res, Set<String> feats) {
+    Long best = res.get(feats);
+    if (best != null) 
+      return best;
+    else 
+      best = -1L;
+    for (Map.Entry<Set<String>, Long> ent : res.entrySet()) { 
+      Set<String> r = ent.getKey();
+      Long supp = ent.getValue();
+      if (supp <= best) 
+        continue;
+      boolean hasAll = true;
+      for (String f : feats) {
+        if (!r.contains(f)) {
+          hasAll = false;
+          break;
+        }
+      }
+      if (hasAll) 
+        best = supp;
+    }
+    return best;
+  }
+  
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthRetailDataTestVs.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthRetailDataTestVs.java?rev=1231700&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthRetailDataTestVs.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthRetailDataTestVs.java Sun Jan 15 16:12:14 2012
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.common.iterator.StringRecordIterator;
+import org.apache.mahout.fpm.pfpgrowth.convertors.StatusUpdater;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth2.FPGrowthObj;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class FPGrowthRetailDataTestVs extends MahoutTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(PFPGrowthRetailDataTestVs.class);
+
+  private long bestResults(Map<Set<String>, Long> res, Set<String> feats) {
+    Long best = res.get(feats);
+    if (best != null) 
+      return best;
+    else 
+      best = -1L;
+    for (Map.Entry<Set<String>, Long> ent : res.entrySet()) { 
+      Set<String> r = ent.getKey();
+      Long supp = ent.getValue();
+      if (supp <= best) 
+        continue;
+      boolean hasAll = true;
+      for (String f : feats) {
+        if (!r.contains(f)) {
+          hasAll = false;
+          break;
+        }
+      }
+      if (hasAll) 
+        best = supp;
+    }
+    return best;
+  }
+
+  @Test
+  public void testVsWithRetailData() throws IOException {
+    String inputFilename = "retail.dat";
+    int minSupport = 500;
+    Set<String> returnableFeatures = new HashSet<String>();
+    
+    org.apache.mahout.fpm.pfpgrowth.fpgrowth.
+      FPGrowth<String> fp1 = new org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth<String>();
+
+    final Map<Set<String>,Long> results1 = Maps.newHashMap();
+    
+    fp1.generateTopKFrequentPatterns(
+      new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename).openStream()), "\\s+"),
+
+      fp1.generateFList(new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename)
+           .openStream()), "\\s+"), minSupport), minSupport, 100000, 
+      returnableFeatures,
+      new OutputCollector<String,List<Pair<List<String>,Long>>>() {
+        
+        @Override
+        public void collect(String key, List<Pair<List<String>,Long>> value) {
+          
+          for (Pair<List<String>,Long> v : value) {
+            List<String> l = v.getFirst();
+            results1.put(new HashSet<String>(l), v.getSecond());
+            log.info("found pat ["+v.getSecond()+"]: "+ v.getFirst());
+          }
+        }
+        
+      }, new StatusUpdater() {
+        
+        @Override
+        public void update(String status) {}
+      });
+
+    FPGrowthObj<String> fp2 = new FPGrowthObj<String>();
+    final Map<Set<String>,Long> initialResults2 = Maps.newHashMap();
+    fp2.generateTopKFrequentPatterns(
+      new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename).openStream()), "\\s+"),
+
+      fp2.generateFList(new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename)
+           .openStream()), "\\s+"), minSupport), minSupport, 100000, 
+      new HashSet<String>(),
+      new OutputCollector<String,List<Pair<List<String>,Long>>>() {
+        
+        @Override
+        public void collect(String key, List<Pair<List<String>,Long>> value) {
+          
+          for (Pair<List<String>,Long> v : value) {
+            List<String> l = v.getFirst();
+            initialResults2.put(new HashSet<String>(l), v.getSecond());
+            log.info("found pat ["+v.getSecond()+"]: "+ v.getFirst());
+          }
+        }
+        
+      }, new StatusUpdater() {
+        
+        @Override
+        public void update(String status) {}
+      });
+
+    Map<Set<String>, Long> results2 = new HashMap<Set<String>, Long>();    
+    if (!returnableFeatures.isEmpty()) {
+      Map<Set<String>, Long> tmpResult = new HashMap<Set<String>, Long>();
+      for (Map.Entry<Set<String>, Long> result2 : initialResults2.entrySet()) {
+        Set<String> r2feats = result2.getKey();
+        boolean hasSome = false;
+        for (String rf : returnableFeatures) {
+          if (r2feats.contains(rf)) {
+            hasSome = true;
+            break;
+          }
+        }
+        if (hasSome) 
+          tmpResult.put(result2.getKey(), result2.getValue());
+      }
+      results2 = tmpResult;
+    } else {
+      results2 = initialResults2;
+    }
+
+    boolean allMatch = true;
+    int itemsetsChecked = 0;
+    for (Map.Entry<Set<String>, Long> result1 : results1.entrySet()) {
+      itemsetsChecked++;
+      Set<String> feats = result1.getKey();
+      long supp1 = result1.getValue();
+      long supp2 = bestResults(results2, feats);
+      if (supp1 != supp2) {
+        allMatch = false;
+        log.info("mismatch checking results1 ["+supp1+" vs "+supp2+"]: "+feats);
+      }
+    }
+    log.info("checked "+itemsetsChecked+" itemsets iterating through #1");
+
+    itemsetsChecked = 0;
+    for (Map.Entry<Set<String>, Long> result2 : results2.entrySet()) { 
+      itemsetsChecked++;
+      Set<String> feats = result2.getKey();
+      long supp2 = result2.getValue();
+      long supp1 = bestResults(results1, feats);
+      if (supp1 != supp2) {
+        allMatch = false;
+        log.info("mismatch checking results2 [ "+supp1+" vs "+supp2+"]: "+feats);
+      }
+    }
+    log.info("checked "+itemsetsChecked+" itemsets iterating through #2");
+
+    assertEquals( "Had mismatches!", allMatch, true);
+  }
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthSyntheticDataTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthSyntheticDataTest.java?rev=1231700&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthSyntheticDataTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthSyntheticDataTest.java Sun Jan 15 16:12:14 2012
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.common.iterator.StringRecordIterator;
+import org.apache.mahout.fpm.pfpgrowth.convertors.StatusUpdater;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth2.FPGrowthObj;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+
+public final class FPGrowthSyntheticDataTest extends MahoutTestCase {
+
+  @Test
+    public void testSpecificCasesFromSynthData() throws IOException {
+    FPGrowthObj<String> fp = new FPGrowthObj<String>();
+    
+    String inputFilename = "FPGsynth.dat";
+    int minSupport = 50;
+
+    StringRecordIterator it = new StringRecordIterator(new FileLineIterable(Resources.getResource(
+                                                                                                  inputFilename).openStream()), "\\s+");
+    int patternCnt_10_13_1669 = 0;
+    int patternCnt_10_13 = 0;
+    while (it.hasNext()) {
+      Pair<List<String>,Long> next = it.next();
+      List<String> items = next.getFirst();
+      if (items.contains("10") && items.contains("13")) {
+        patternCnt_10_13++;
+        if (items.contains("1669")) {
+          patternCnt_10_13_1669++;
+        }
+      }
+    }
+    
+    if (patternCnt_10_13_1669 < minSupport) 
+      throw new IllegalStateException("the test is broken or data is missing ("
+                                      + patternCnt_10_13_1669+", "
+                                      + patternCnt_10_13+")");
+
+    final Map<Set<String>,Long> results = Maps.newHashMap();
+    
+    Set<String> features_10_13 = new HashSet<String>();
+    features_10_13.add("10");
+    features_10_13.add("13");
+
+    Set<String> returnableFeatures = new HashSet<String>();
+    returnableFeatures.add("10");
+    returnableFeatures.add("13");
+    returnableFeatures.add("1669");
+    
+    fp.generateTopKFrequentPatterns(
+                                    new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename).openStream()), "\\s+"),
+
+                                    fp.generateFList(new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename)
+                                                                                                   .openStream()), "\\s+"), minSupport), minSupport, 100000, 
+                                    returnableFeatures,
+                                    new OutputCollector<String,List<Pair<List<String>,Long>>>() {
+        
+                                      @Override
+                                        public void collect(String key, List<Pair<List<String>,Long>> value) {
+          
+                                        for (Pair<List<String>,Long> v : value) {
+                                          List<String> l = v.getFirst();
+                                          results.put(new HashSet<String>(l), v.getSecond());
+                                          System.out.println("found pat ["+v.getSecond()+"]: "+ v.getFirst());
+                                        }
+                                      }
+        
+                                    }, new StatusUpdater() {
+        
+                                        @Override
+                                          public void update(String status) {}
+                                      });
+
+    assertEquals(patternCnt_10_13, highestSupport(results, features_10_13));
+    assertEquals(patternCnt_10_13_1669, highestSupport(results, returnableFeatures));
+    
+  }
+
+  private long highestSupport(Map<Set<String>, Long> res, Set<String> feats) {
+    Long best= res.get(feats);
+    if (best != null) 
+      return best;
+    else 
+      best= -1L;
+    for (Map.Entry<Set<String>, Long> ent : res.entrySet()) { 
+      Set<String> r= ent.getKey();
+      Long supp= ent.getValue();
+      if (supp <= best) 
+        continue;
+      boolean hasAll= true;
+      for (String f : feats) {
+        if (!r.contains(f)) {
+          hasAll= false;
+          break;
+        }
+      }
+      if (hasAll) 
+        best= supp;
+    }
+    return best;
+  }
+
+  @Test
+    public void testVsWithSynthData() throws IOException {
+    String inputFilename= "FPGsynth.dat";
+    int minSupport= 100;
+    Set<String> returnableFeatures = new HashSet<String>();
+
+    // not limiting features (or including too many) can cause
+    // the test to run a very long time
+    returnableFeatures.add("10");
+    returnableFeatures.add("13");
+    //    returnableFeatures.add("1669");
+    
+    FPGrowth<String> fp1 = new FPGrowth<String>();
+
+    final Map<Set<String>,Long> results1 = Maps.newHashMap();
+    
+    fp1.generateTopKFrequentPatterns(
+                                     new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename).openStream()), "\\s+"),
+
+                                     fp1.generateFList(new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename)
+                                                                                                     .openStream()), "\\s+"), minSupport), minSupport, 1000000, 
+                                     returnableFeatures,
+                                     new OutputCollector<String,List<Pair<List<String>,Long>>>() {
+        
+                                       @Override
+                                         public void collect(String key, List<Pair<List<String>,Long>> value) {
+          
+                                         for (Pair<List<String>,Long> v : value) {
+                                           List<String> l = v.getFirst();
+                                           results1.put(new HashSet<String>(l), v.getSecond());
+                                           System.out.println("found pat ["+v.getSecond()+"]: "+ v.getFirst());
+                                         }
+                                       }
+        
+                                     }, new StatusUpdater() {
+        
+                                         @Override
+                                           public void update(String status) {}
+                                       });
+
+    FPGrowthObj<String> fp2 = new FPGrowthObj<String>();
+    final Map<Set<String>,Long> initialResults2 = Maps.newHashMap();
+    fp2.generateTopKFrequentPatterns(
+                                     new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename).openStream()), "\\s+"),
+
+                                     fp2.generateFList(new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename)
+                                                                                                     .openStream()), "\\s+"), minSupport), minSupport, 1000000, 
+                                     new HashSet<String>(),
+                                     new OutputCollector<String,List<Pair<List<String>,Long>>>() {
+        
+                                       @Override
+                                         public void collect(String key, List<Pair<List<String>,Long>> value) {
+          
+                                         for (Pair<List<String>,Long> v : value) {
+                                           List<String> l = v.getFirst();
+                                           initialResults2.put(new HashSet<String>(l), v.getSecond());
+                                           System.out.println("found pat ["+v.getSecond()+"]: "+ v.getFirst());
+                                         }
+                                       }
+        
+                                     }, new StatusUpdater() {
+        
+                                         @Override
+                                           public void update(String status) {}
+                                       });
+
+    Map<Set<String>, Long> results2= new HashMap<Set<String>, Long>();    
+    if (!returnableFeatures.isEmpty()) {
+      Map<Set<String>, Long> tmpResult= new HashMap<Set<String>, Long>();
+      for (Map.Entry<Set<String>, Long> result2 : initialResults2.entrySet()) {
+        Set<String> r2feats= result2.getKey();
+        boolean hasSome= false;
+        for (String rf : returnableFeatures) {
+          if (r2feats.contains(rf)) {
+            hasSome= true;
+            break;
+          }
+        }
+        if (hasSome) 
+          tmpResult.put(result2.getKey(), result2.getValue());
+      }
+      results2= tmpResult;
+    } else {
+      results2= initialResults2;
+    }
+
+    boolean allMatch= true;
+    int itemsetsChecked= 0;
+    for (Map.Entry<Set<String>, Long> result1 : results1.entrySet()) {
+      itemsetsChecked++;
+      Set<String> feats= result1.getKey();
+      long supp1= result1.getValue();
+      long supp2= highestSupport(results2, feats);
+      if (supp1 != supp2) {
+        allMatch= false;
+        System.out.println("mismatch checking results1 [ "+supp1+" vs "+supp2+"]: "+feats);
+      }
+    }
+    System.out.println("checked "+itemsetsChecked+" itemsets iterating through #1");
+
+    itemsetsChecked= 0;
+    for (Map.Entry<Set<String>, Long> result2 : results2.entrySet()) { 
+      itemsetsChecked++;
+      Set<String> feats= result2.getKey();
+      long supp2= result2.getValue();
+      long supp1= highestSupport(results1, feats);
+      if (supp1 != supp2) {
+        allMatch= false;
+        System.out.println("mismatch checking results2 [ "+supp1+" vs "+supp2+"]: "+feats);
+      }
+    }
+    System.out.println("checked "+itemsetsChecked+" itemsets iterating through #2");
+
+    assertEquals("Had mismatches!", allMatch, true);
+  }
+
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest2.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest2.java?rev=1231700&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest2.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest2.java Sun Jan 15 16:12:14 2012
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.fpm.pfpgrowth.convertors.ContextStatusUpdater;
+import org.apache.mahout.fpm.pfpgrowth.convertors.SequenceFileOutputCollector;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.StringOutputConverter;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth2.FPGrowthObj;
+import org.junit.Test;
+
+public final class FPGrowthTest2 extends MahoutTestCase {
+
+  @Test
+  public void testMaxHeapFPGrowth() throws Exception {
+
+    FPGrowthObj<String> fp = new FPGrowthObj<String>();
+
+    Collection<Pair<List<String>,Long>> transactions = Lists.newArrayList();
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("E", "A", "D", "B"), 1L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("D", "A", "C", "E", "B"), 1L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("C", "A", "B", "E"), 1L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("B", "A", "D"), 1L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("D"), 1L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("D", "B"), 1L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("A", "D", "E"), 1L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("B", "C"), 1L));
+
+    Path path = getTestTempFilePath("fpgrowthTest.dat");
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+
+    SequenceFile.Writer writer =
+        new SequenceFile.Writer(fs, conf, path, Text.class, TopKStringPatterns.class);
+    try {
+    fp.generateTopKFrequentPatterns(
+        transactions.iterator(),
+        fp.generateFList(transactions.iterator(), 3),
+        3,
+        100,
+        new HashSet<String>(),
+        new StringOutputConverter(new SequenceFileOutputCollector<Text,TopKStringPatterns>(writer)),
+        new ContextStatusUpdater(null));
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+
+    List<Pair<String, TopKStringPatterns>> frequentPatterns = FPGrowthObj.readFrequentPattern(conf, path);
+    assertEquals(
+      "[(C,([B, C],3)), "
+          + "(E,([A, E],4), ([A, B, E],3), ([A, D, E],3)), "
+          + "(A,([A],5), ([A, D],4), ([A, B],4), ([A, B, D],3)), "
+          + "(D,([D],6), ([B, D],4)), "
+          + "(B,([B],6))]",
+      frequentPatterns.toString());
+
+  }
+  
+  /**
+   * Trivial test for MAHOUT-617
+   */
+  @Test
+  public void testMaxHeapFPGrowthData1() throws Exception {
+
+    FPGrowthObj<String> fp = new FPGrowthObj<String>();
+
+    Collection<Pair<List<String>,Long>> transactions = Lists.newArrayList();
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("X"), 12L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("Y"), 4L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("X", "Y"), 10L));
+
+    Path path = getTestTempFilePath("fpgrowthTestData1.dat");
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    System.out.println(fp.generateFList(transactions.iterator(), 2));
+    SequenceFile.Writer writer =
+        new SequenceFile.Writer(fs, conf, path, Text.class, TopKStringPatterns.class);
+    try {
+      fp.generateTopKFrequentPatterns(
+          transactions.iterator(),
+          fp.generateFList(transactions.iterator(), 2),
+          2,
+          100,
+          new HashSet<String>(),
+          new StringOutputConverter(new SequenceFileOutputCollector<Text,TopKStringPatterns>(writer)),
+          new ContextStatusUpdater(null));
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+
+    List<Pair<String, TopKStringPatterns>> frequentPatterns = FPGrowthObj.readFrequentPattern(conf, path);
+    assertEquals(
+      "[(Y,([Y],14), ([X, Y],10)), (X,([X],22))]", frequentPatterns.toString());
+  }
+  
+  /**
+   * Trivial test for MAHOUT-617
+   */
+  @Test
+  public void testMaxHeapFPGrowthData2() throws Exception {
+
+    FPGrowthObj<String> fp = new FPGrowthObj<String>();
+
+    Collection<Pair<List<String>,Long>> transactions = Lists.newArrayList();
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("X"), 12L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("Y"), 4L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("X", "Y"), 10L));
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("X", "Y", "Z"), 11L));
+
+    Path path = getTestTempFilePath("fpgrowthTestData2.dat");
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    System.out.println(fp.generateFList(transactions.iterator(), 2));
+    SequenceFile.Writer writer =
+        new SequenceFile.Writer(fs, conf, path, Text.class, TopKStringPatterns.class);
+    try {
+      fp.generateTopKFrequentPatterns(
+          transactions.iterator(),
+          fp.generateFList(transactions.iterator(), 2),
+          2,
+          100,
+          new HashSet<String>(),
+          new StringOutputConverter(new SequenceFileOutputCollector<Text,TopKStringPatterns>(writer)),
+          new ContextStatusUpdater(null));
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+
+    List<Pair<String, TopKStringPatterns>> frequentPatterns = FPGrowthObj.readFrequentPattern(conf, path);
+    assertEquals(
+      "[(Z,([X, Y, Z],11)), (Y,([Y],25), ([X, Y],21)), (X,([X],33))]",
+      frequentPatterns.toString());
+  }
+
+  /**
+   * Trivial test for MAHOUT-355
+   */
+  @Test
+  public void testNoNullPointerExceptionWhenReturnableFeaturesIsNull() throws Exception {
+
+    FPGrowthObj<String> fp = new FPGrowthObj<String>();
+
+    Collection<Pair<List<String>,Long>> transactions = Lists.newArrayList();
+    transactions.add(new Pair<List<String>,Long>(Arrays.asList("E", "A", "D", "B"), 1L));
+
+    OutputCollector<String, List<Pair<List<String>, Long>>> noOutput =
+        new OutputCollector<String,List<Pair<List<String>,Long>>>() {
+      @Override
+      public void collect(String arg0, List<Pair<List<String>, Long>> arg1) { 
+      }
+    };
+
+    fp.generateTopKFrequentPatterns(
+        transactions.iterator(),
+        fp.generateFList(transactions.iterator(), 3),
+        3,
+        100,
+        null,
+        noOutput,
+        new ContextStatusUpdater(null));
+  }
+}

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java?rev=1231700&r1=1231699&r2=1231700&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java Sun Jan 15 16:12:14 2012
@@ -87,11 +87,67 @@ public class PFPGrowthRetailDataTest ext
     }
   }
   
+   
+  /**
+   * Test Parallel FPGrowth on retail data using top-level runPFPGrowth() method
+   */ 
   @Test
   public void testRetailDataMinSup100() throws Exception {
     StringRecordIterator it = new StringRecordIterator(new FileLineIterable(Resources.getResource(
       "retail_results_with_min_sup_100.dat").openStream()), "\\s+");
-    
+    Map<Set<String>,Long> expectedResults = Maps.newHashMap();
+    while (it.hasNext()) {
+      Pair<List<String>,Long> next = it.next();
+      List<String> items = Lists.newArrayList(next.getFirst());
+      String supportString = items.remove(items.size() - 1);
+      Long support = Long.parseLong(supportString.substring(1, supportString.length() - 1));
+      expectedResults.put(new HashSet<String>(items), support);
+    }
+
+    PFPGrowth.runPFPGrowth(params);
+
+    List<Pair<String,TopKStringPatterns>> frequentPatterns = PFPGrowth.readFrequentPattern(params);
+  
+    Map<Set<String>,Long> results = Maps.newHashMap();
+    for (Pair<String,TopKStringPatterns> topK : frequentPatterns) {
+      Iterator<Pair<List<String>,Long>> topKIt = topK.getSecond().iterator();
+      while (topKIt.hasNext()) {
+        Pair<List<String>,Long> entry = topKIt.next();
+        results.put(new HashSet<String>(entry.getFirst()), entry.getSecond());
+      }
+    }
+  
+    for (Entry<Set<String>,Long> entry : results.entrySet()) {
+      Set<String> key = entry.getKey();
+      if (expectedResults.get(key) == null) {
+        System.out.println("spurious (1): " + key+ " with " +entry.getValue());
+      } else {
+        if (!expectedResults.get(key).equals(results.get(entry.getKey()))) {
+          System.out.println("invalid (1): " + key + ", expected: " + expectedResults.get(key) + ", got: "
+                             +                             + results.get(entry.getKey()));
+        } else {
+          System.out.println("matched (1): " + key + ", with: " + expectedResults.get(key));
+        }
+      }
+    }
+  
+    for (Entry<Set<String>,Long> entry : expectedResults.entrySet()) {
+      Set<String> key = entry.getKey();
+      if (results.get(key) == null) {
+        System.out.println("missing (1): " + key+ " with " +entry.getValue());
+      }
+    }
+    assertEquals(expectedResults.size(), results.size());
+  }
+  
+
+  /**
+   * Test Parallel FPG on retail data, running various stages individually
+   */ 
+  @Test
+  public void testRetailDataMinSup100InSteps() throws Exception {
+    StringRecordIterator it = new StringRecordIterator(new FileLineIterable(Resources.getResource(
+      "retail_results_with_min_sup_100.dat").openStream()), "\\s+");   
     Map<Set<String>,Long> expectedResults = Maps.newHashMap();
     while (it.hasNext()) {
       Pair<List<String>,Long> next = it.next();
@@ -103,11 +159,16 @@ public class PFPGrowthRetailDataTest ext
     Configuration conf = new Configuration();
     log.info("Starting Parallel Counting Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
     PFPGrowth.startParallelCounting(params, conf);
-    log.info("Starting Grouping Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startGroupingItems(params, conf);
-    log.info("Starting Parallel FPGrowth Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startGroupingItems(params, conf);
-    PFPGrowth.startTransactionSorting(params, conf);
+
+    List<Pair<String,Long>> fList = PFPGrowth.readFList(params);
+    PFPGrowth.saveFList(fList, params, conf);
+    int numGroups = params.getInt(PFPGrowth.NUM_GROUPS, 
+                                  PFPGrowth.NUM_GROUPS_DEFAULT);
+    int maxPerGroup = fList.size() / numGroups;
+    if (fList.size() % numGroups != 0) 
+      maxPerGroup++;
+    params.set(PFPGrowth.MAX_PER_GROUP, Integer.toString(maxPerGroup));
+
     PFPGrowth.startParallelFPGrowth(params, conf);
     log.info("Starting Pattern Aggregation Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
     PFPGrowth.startAggregating(params, conf);
@@ -125,11 +186,13 @@ public class PFPGrowthRetailDataTest ext
     for (Entry<Set<String>,Long> entry : results.entrySet()) {
       Set<String> key = entry.getKey();
       if (expectedResults.get(key) == null) {
-        System.out.println("missing: " + key);
+        System.out.println("spurious (2): " + key);
       } else {
         if (!expectedResults.get(key).equals(results.get(entry.getKey()))) {
-          System.out.println("invalid: " + key + ", expected: " + expectedResults.get(key) + ", got: "
+          System.out.println("invalid (2): " + key + ", expected: " + expectedResults.get(key) + ", got: "
                              + results.get(entry.getKey()));
+        } else {
+          System.out.println("matched (2): " + key + ", with: " + expectedResults.get(key));
         }
       }
     }

Added: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest2.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest2.java?rev=1231700&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest2.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest2.java Sun Jan 15 16:12:14 2012
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth2;
+
+import java.io.File;
+import java.io.Writer;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.common.iterator.StringRecordIterator;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import org.apache.mahout.fpm.pfpgrowth.PFPGrowth;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Resources;
+
+public class PFPGrowthRetailDataTest2 extends MahoutTestCase {
+  
+  private final Parameters params = new Parameters();
+  private static final Logger log = LoggerFactory.getLogger(PFPGrowthRetailDataTest2.class);
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    params.set(PFPGrowth.MIN_SUPPORT, "100");
+    params.set(PFPGrowth.MAX_HEAPSIZE, "10000");
+    params.set(PFPGrowth.NUM_GROUPS, "50");
+    params.set(PFPGrowth.ENCODING, "UTF-8");
+    params.set(PFPGrowth.USE_FPG2, "true");
+    File inputDir = getTestTempDir("transactions");
+    File outputDir = getTestTempDir("frequentpatterns");
+    File input = new File(inputDir, "test.txt");
+    params.set(PFPGrowth.INPUT, input.getAbsolutePath());
+    params.set(PFPGrowth.OUTPUT, outputDir.getAbsolutePath());
+    Writer writer = Files.newWriter(input, Charsets.UTF_8);
+    try {
+      StringRecordIterator it = new StringRecordIterator(new FileLineIterable(Resources.getResource(
+        "retail.dat").openStream()), "\\s+");
+      Collection<List<String>> transactions = Lists.newArrayList();
+      
+      while (it.hasNext()) {
+        Pair<List<String>,Long> next = it.next();
+        transactions.add(next.getFirst());
+      }
+      
+      for (List<String> transaction : transactions) {
+        String sep = "";
+        for (String item : transaction) {
+          writer.write(sep + item);
+          sep = ",";
+        }
+        writer.write("\n");
+      }
+      
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+  }
+  
+  /**
+   * Test Parallel FPGrowth on retail data using top-level runPFPGrowth() method
+   */ 
+  @Test
+  public void testRetailDataMinSup100() throws Exception {
+    StringRecordIterator it = new StringRecordIterator(new FileLineIterable(Resources.getResource(
+      "retail_results_with_min_sup_100.dat").openStream()), "\\s+");
+    Map<Set<String>,Long> expectedResults = Maps.newHashMap();
+    while (it.hasNext()) {
+      Pair<List<String>,Long> next = it.next();
+      List<String> items = Lists.newArrayList(next.getFirst());
+      String supportString = items.remove(items.size() - 1);
+      Long support = Long.parseLong(supportString.substring(1, supportString.length() - 1));
+      expectedResults.put(new HashSet<String>(items), support);
+    }
+
+    PFPGrowth.runPFPGrowth(params);
+
+    List<Pair<String,TopKStringPatterns>> frequentPatterns = PFPGrowth.readFrequentPattern(params);
+    
+    Map<Set<String>,Long> results = Maps.newHashMap();
+    for (Pair<String,TopKStringPatterns> topK : frequentPatterns) {
+      Iterator<Pair<List<String>,Long>> topKIt = topK.getSecond().iterator();
+      while (topKIt.hasNext()) {
+        Pair<List<String>,Long> entry = topKIt.next();
+        results.put(new HashSet<String>(entry.getFirst()), entry.getSecond());
+      }
+    }
+    
+    for (Entry<Set<String>,Long> entry : results.entrySet()) {
+      Set<String> key = entry.getKey();
+      if (expectedResults.get(key) == null) {
+        System.out.println("spurious (1): " + key+ " with " +entry.getValue());
+      } else {
+        if (!expectedResults.get(key).equals(results.get(entry.getKey()))) {
+          System.out.println("invalid (1): " + key + ", expected: " + expectedResults.get(key) + ", got: "
+                             + results.get(entry.getKey()));
+        } else {
+          System.out.println("matched (1): " + key + ", with: " + expectedResults.get(key));
+        }
+      }
+    }
+    
+    for (Entry<Set<String>,Long> entry : expectedResults.entrySet()) {
+      Set<String> key = entry.getKey();
+      if (results.get(key) == null) {
+        System.out.println("missing (1): " + key+ " with " +entry.getValue());
+      }
+    }
+    assertEquals(expectedResults.size(), results.size());
+  }
+
+  /**
+   * Test Parallel FPG on retail data, running various stages individually
+   */ 
+  @Test
+  public void testRetailDataMinSup100InSteps() throws Exception {
+    StringRecordIterator it = new StringRecordIterator(new FileLineIterable(Resources.getResource(
+      "retail_results_with_min_sup_100.dat").openStream()), "\\s+");    
+    Map<Set<String>,Long> expectedResults = Maps.newHashMap();
+    while (it.hasNext()) {
+      Pair<List<String>,Long> next = it.next();
+      List<String> items = Lists.newArrayList(next.getFirst());
+      String supportString = items.remove(items.size() - 1);
+      Long support = Long.parseLong(supportString.substring(1, supportString.length() - 1));
+      expectedResults.put(new HashSet<String>(items), support);
+    }
+    Configuration conf = new Configuration();
+    log.info("Starting Parallel Counting Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
+    PFPGrowth.startParallelCounting(params, conf);
+
+    List<Pair<String,Long>> fList = PFPGrowth.readFList(params);
+    PFPGrowth.saveFList(fList, params, conf);
+
+    int numGroups = params.getInt(PFPGrowth.NUM_GROUPS, 
+                                  PFPGrowth.NUM_GROUPS_DEFAULT);
+    int maxPerGroup = fList.size() / numGroups;
+    if (fList.size() % numGroups != 0) 
+      maxPerGroup++;
+    params.set(PFPGrowth.MAX_PER_GROUP, Integer.toString(maxPerGroup));
+
+    log.info("Starting Parallel FPGrowth Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
+    PFPGrowth.startParallelFPGrowth(params, conf);
+    log.info("Starting Pattern Aggregation Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
+    PFPGrowth.startAggregating(params, conf);
+    List<Pair<String,TopKStringPatterns>> frequentPatterns = PFPGrowth.readFrequentPattern(params);
+    
+    Map<Set<String>,Long> results = Maps.newHashMap();
+    for (Pair<String,TopKStringPatterns> topK : frequentPatterns) {
+      Iterator<Pair<List<String>,Long>> topKIt = topK.getSecond().iterator();
+      while (topKIt.hasNext()) {
+        Pair<List<String>,Long> entry = topKIt.next();
+        results.put(new HashSet<String>(entry.getFirst()), entry.getSecond());
+      }
+    }
+    
+    for (Entry<Set<String>,Long> entry : results.entrySet()) {
+      Set<String> key = entry.getKey();
+      if (expectedResults.get(key) == null) {
+        System.out.println("spurious (2): " + key + ", " + entry.getValue());
+      } else {
+        if (!expectedResults.get(key).equals(results.get(entry.getKey()))) {
+          System.out.println("invalid (2): " + key + ", expected: " + expectedResults.get(key) + ", got: "
+                             + results.get(entry.getKey()));
+        } else {
+          System.out.println("matched (2): " + key + ", with: " + expectedResults.get(key));
+        }         
+      }
+    }
+    
+    for (Entry<Set<String>,Long> entry : expectedResults.entrySet()) {
+      Set<String> key = entry.getKey();
+      if (results.get(key) == null) {
+        System.out.println("missing2: " + key + ", " + entry.getValue());
+      }
+    }
+    assertEquals(expectedResults.size(), results.size());
+  }
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTestVs.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTestVs.java?rev=1231700&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTestVs.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTestVs.java Sun Jan 15 16:12:14 2012
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import java.io.File;
+import java.io.Writer;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.common.iterator.StringRecordIterator;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+
+public final class PFPGrowthRetailDataTestVs extends MahoutTestCase {
+
+  private final Parameters paramsImpl1 = new Parameters();
+  private final Parameters paramsImpl2 = new Parameters();
+  private static final Logger log = LoggerFactory.getLogger(PFPGrowthRetailDataTestVs.class);
+
+  private static long bestResults(Map<Set<String>, Long> res, Set<String> feats) {
+    Long best = res.get(feats);
+    if (best != null) 
+      return best;
+    else 
+      best = -1L;
+    for (Map.Entry<Set<String>, Long> ent : res.entrySet()) { 
+      Set<String> r = ent.getKey();
+      Long supp = ent.getValue();
+      if (supp <= best) 
+        continue;
+      boolean hasAll = true;
+      for (String f : feats) {
+        if (!r.contains(f)) {
+          hasAll = false;
+          break;
+        }
+      }
+      if (hasAll) 
+        best = supp;
+    }
+    return best;
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+
+    File inputDir = getTestTempDir("transactions");
+    File input = new File(inputDir, "test.txt");
+
+    paramsImpl1.set(PFPGrowth.MIN_SUPPORT, "100");
+    paramsImpl1.set(PFPGrowth.MAX_HEAPSIZE, "10000");
+    paramsImpl1.set(PFPGrowth.NUM_GROUPS, "50");
+    paramsImpl1.set(PFPGrowth.ENCODING, "UTF-8");
+    paramsImpl1.set(PFPGrowth.INPUT, input.getAbsolutePath());
+
+    paramsImpl2.set(PFPGrowth.MIN_SUPPORT, "100");
+    paramsImpl2.set(PFPGrowth.MAX_HEAPSIZE, "10000");
+    paramsImpl2.set(PFPGrowth.NUM_GROUPS, "50");
+    paramsImpl2.set(PFPGrowth.ENCODING, "UTF-8");
+    paramsImpl2.set(PFPGrowth.INPUT, input.getAbsolutePath());
+    paramsImpl2.set(PFPGrowth.USE_FPG2, "true");
+
+    File outputDir1 = getTestTempDir("frequentpatterns1");
+    paramsImpl1.set(PFPGrowth.OUTPUT, outputDir1.getAbsolutePath());
+
+    File outputDir2 = getTestTempDir("frequentpatterns2");
+    paramsImpl2.set(PFPGrowth.OUTPUT, outputDir2.getAbsolutePath());
+
+    Writer writer = Files.newWriter(input, Charsets.UTF_8);
+    try {
+      StringRecordIterator it = new StringRecordIterator(new FileLineIterable(Resources.getResource(
+        "retail.dat").openStream()), "\\s+");
+      Collection<List<String>> transactions = Lists.newArrayList();
+      
+      while (it.hasNext()) {
+        Pair<List<String>,Long> next = it.next();
+        transactions.add(next.getFirst());
+      }
+      
+      for (List<String> transaction : transactions) {
+        String sep = "";
+        for (String item : transaction) {
+          writer.write(sep + item);
+          sep = ",";
+        }
+        writer.write("\n");
+      }
+      
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+  }
+  
+   
+  /**
+   * Test Parallel FPGrowth on retail data using top-level runPFPGrowth() method
+   */ 
+  @Test
+  public void testParallelRetailVs() throws Exception {
+
+    PFPGrowth.runPFPGrowth(paramsImpl1);
+    List<Pair<String,TopKStringPatterns>> frequentPatterns1 = PFPGrowth.readFrequentPattern(paramsImpl1);
+    
+    Map<Set<String>,Long> results1 = Maps.newHashMap();
+    for (Pair<String,TopKStringPatterns> topK : frequentPatterns1) {
+      Iterator<Pair<List<String>,Long>> topKIt = topK.getSecond().iterator();
+      while (topKIt.hasNext()) {
+        Pair<List<String>,Long> entry = topKIt.next();
+        results1.put(new HashSet<String>(entry.getFirst()), entry.getSecond());
+      }
+    }
+  
+    PFPGrowth.runPFPGrowth(paramsImpl2);
+    List<Pair<String,TopKStringPatterns>> frequentPatterns2 = PFPGrowth.readFrequentPattern(paramsImpl2);
+  
+    Map<Set<String>,Long> results2 = Maps.newHashMap();
+    for (Pair<String,TopKStringPatterns> topK : frequentPatterns2) {
+      Iterator<Pair<List<String>,Long>> topKIt = topK.getSecond().iterator();
+      while (topKIt.hasNext()) {
+        Pair<List<String>,Long> entry = topKIt.next();
+        results2.put(new HashSet<String>(entry.getFirst()), entry.getSecond());
+      }
+    }
+  
+    for (Entry<Set<String>,Long> entry : results1.entrySet()) {
+      Set<String> key = entry.getKey();
+      if (results2.get(key) == null) {
+        System.out.println("spurious (1): " + key+ " with " +entry.getValue());
+      } else {
+        if (!results2.get(key).equals(results1.get(entry.getKey()))) {
+          System.out.println("invalid (1): " + key + ", expected: " + results2.get(key) + ", got: "
+                             +                             + results1.get(entry.getKey()));
+        } else {
+          System.out.println("matched (1): " + key + ", with: " + results2.get(key));
+        }
+      }
+    }
+  
+    for (Entry<Set<String>,Long> entry : results2.entrySet()) {
+      Set<String> key = entry.getKey();
+      if (results1.get(key) == null) {
+        System.out.println("missing (1): " + key+ " with " +entry.getValue());
+      }
+    }
+    assertEquals(results2.size(), results1.size());
+  }
+
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthSynthDataTest2.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthSynthDataTest2.java?rev=1231700&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthSynthDataTest2.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthSynthDataTest2.java Sun Jan 15 16:12:14 2012
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth2;
+
+import java.io.File;
+import java.io.Writer;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.common.iterator.StringRecordIterator;
+import org.apache.mahout.fpm.pfpgrowth.convertors.StatusUpdater;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import org.apache.mahout.fpm.pfpgrowth.PFPGrowth;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth2.FPGrowthObj;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Resources;
+
+public class PFPGrowthSynthDataTest2 extends MahoutTestCase {
+  
+  private final Parameters params = new Parameters();
+  private static final Logger log = LoggerFactory.getLogger(PFPGrowthSynthDataTest2.class);
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    params.set(PFPGrowth.MIN_SUPPORT, "100");
+    params.set(PFPGrowth.MAX_HEAPSIZE, "10000");
+    params.set(PFPGrowth.NUM_GROUPS, "50");
+    params.set(PFPGrowth.ENCODING, "UTF-8");
+    params.set(PFPGrowth.USE_FPG2, "true");
+    params.set(PFPGrowth.SPLIT_PATTERN, " ");
+    File inputDir = getTestTempDir("transactions");
+    File outputDir = getTestTempDir("frequentpatterns");
+    File input = new File(inputDir, "synth_test.txt");
+    params.set(PFPGrowth.INPUT, input.getAbsolutePath());
+    params.set(PFPGrowth.OUTPUT, outputDir.getAbsolutePath());
+    Writer writer = Files.newWriter(input, Charsets.UTF_8);
+    try {
+      StringRecordIterator it = new StringRecordIterator(new FileLineIterable(Resources.getResource(
+        "FPGsynth.dat").openStream()), "\\s+");
+      Collection<List<String>> transactions = Lists.newArrayList();
+      
+      while (it.hasNext()) {
+        Pair<List<String>,Long> next = it.next();
+        transactions.add(next.getFirst());
+      }
+      
+      for (List<String> transaction : transactions) {
+        String sep = "";
+        for (String item : transaction) {
+          writer.write(sep + item);
+          sep = " ";
+        }
+        writer.write("\n");
+      }
+      
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+  }
+
+
+  private long highestSupport(Map<Set<String>, Long> res, Set<String> feats) {
+    Long best= res.get(feats);
+    if (best != null) 
+      return best;
+    else 
+      best= -1L;
+    for (Map.Entry<Set<String>, Long> ent : res.entrySet()) { 
+      Set<String> r= ent.getKey();
+      Long supp= ent.getValue();
+      if (supp <= best) 
+        continue;
+      boolean hasAll= true;
+      for (String f : feats) {
+        if (!r.contains(f)) {
+          hasAll= false;
+          break;
+        }
+      }
+      if (hasAll) 
+        best= supp;
+    }
+    return best;
+  }
+
+  @Test
+  public void testVsSequential() throws Exception {
+
+    final Map<Set<String>,Long> parallelResult = Maps.newHashMap();
+
+    PFPGrowth.runPFPGrowth(params);
+    List<Pair<String,TopKStringPatterns>> tmpParallel = PFPGrowth.readFrequentPattern(params);
+    
+    for (Pair<String,TopKStringPatterns> topK : tmpParallel) {
+      Iterator<Pair<List<String>,Long>> topKIt = topK.getSecond().iterator();
+      while (topKIt.hasNext()) {
+        Pair<List<String>,Long> entry = topKIt.next();
+        parallelResult.put(new HashSet<String>(entry.getFirst()), entry.getSecond());
+      }
+    }
+
+    //////
+
+    String inputFilename= "FPGsynth.dat";
+    int minSupport= 100;
+
+    final Map<Set<String>,Long> seqResult = Maps.newHashMap();
+    
+    FPGrowthObj<String> fpSeq = new FPGrowthObj<String>();
+    fpSeq.generateTopKFrequentPatterns(
+      new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename).openStream()), "\\s+"),
+
+      fpSeq.generateFList(new StringRecordIterator(new FileLineIterable(Resources.getResource(inputFilename)
+           .openStream()), "\\s+"), minSupport), minSupport, 1000000, 
+      null,
+      new OutputCollector<String,List<Pair<List<String>,Long>>>() {
+        
+        @Override
+        public void collect(String key, List<Pair<List<String>,Long>> value) {
+          
+          for (Pair<List<String>,Long> v : value) {
+            List<String> l = v.getFirst();
+            seqResult.put(new HashSet<String>(l), v.getSecond());
+          }
+        }
+        
+      }, new StatusUpdater() {
+        
+        @Override
+        public void update(String status) {}
+      });
+
+    for (Entry<Set<String>,Long> entry : parallelResult.entrySet()) {
+      Set<String> key = entry.getKey();
+      if (seqResult.get(key) == null) {
+        log.info("spurious (1): " + key+ " with " +entry.getValue());
+      } else {
+        if (!seqResult.get(key).equals(parallelResult.get(entry.getKey()))) {
+          log.info("invalid (1): " + key + ", expected: " + seqResult.get(key) + ", got: "
+                             +                             + parallelResult.get(entry.getKey()));
+        } else {
+          log.info("matched (1): " + key + ", with: " + seqResult.get(key));
+        }
+      }
+    }
+  
+    for (Entry<Set<String>,Long> entry : seqResult.entrySet()) {
+      Set<String> key = entry.getKey();
+      if (parallelResult.get(key) == null) {
+        log.info("missing (1): " + key+ " with " +entry.getValue());
+      }
+    }
+    assertEquals(seqResult.size(), parallelResult.size());
+  }
+
+}

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java?rev=1231700&r1=1231699&r2=1231700&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java Sun Jan 15 16:12:14 2012
@@ -79,9 +79,31 @@ public final class PFPGrowthTest extends
     
   }
 
+  /**
+   * Test Parallel FPGrowth on small example data using top-level
+   * runPFPGrowth() method
+   */ 
   @Test
   public void testStartParallelFPGrowth() throws Exception {
     Configuration conf = new Configuration();
+    PFPGrowth.runPFPGrowth(params);
+
+    List<Pair<String,TopKStringPatterns>> frequentPatterns = PFPGrowth.readFrequentPattern(params);
+
+    assertEquals("[(A,([A],5), ([D, A],4), ([B, A],4), ([A, E],4)), "
+                 + "(B,([B],6), ([B, D],4), ([B, A],4), ([B, D, A],3)), " 
+                 + "(C,([B, C],3)), "
+                 + "(D,([D],6), ([D, A],4), ([B, D],4), ([D, A, E],3)), "
+                 + "(E,([A, E],4), ([D, A, E],3), ([B, A, E],3))]", frequentPatterns.toString());
+  }
+
+  /**
+   * Test Parallel FPGrowth on small example data using top-level
+   * runPFPGrowth() method
+   */ 
+  @Test
+  public void testStartParallelFPGrowthInSteps() throws Exception {
+    Configuration conf = new Configuration();
     log.info("Starting Parallel Counting Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
     PFPGrowth.startParallelCounting(params, conf);
     log.info("Reading fList Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
@@ -89,12 +111,15 @@ public final class PFPGrowthTest extends
     log.info("{}", fList);
     assertEquals("[(B,6), (D,6), (A,5), (E,4), (C,3)]", fList.toString());
  
-    log.info("Starting Grouping Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startGroupingItems(params, conf);
- 
+    PFPGrowth.saveFList(fList, params, conf);
+    int numGroups = params.getInt(PFPGrowth.NUM_GROUPS, 
+                                  PFPGrowth.NUM_GROUPS_DEFAULT);
+    int maxPerGroup = fList.size() / numGroups;
+    if (fList.size() % numGroups != 0) 
+      maxPerGroup++;
+    params.set(PFPGrowth.MAX_PER_GROUP, Integer.toString(maxPerGroup));
+
     log.info("Starting Parallel FPGrowth Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startGroupingItems(params, conf);
-    PFPGrowth.startTransactionSorting(params, conf);
     PFPGrowth.startParallelFPGrowth(params, conf);
     log.info("Starting Pattern Aggregation Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
     PFPGrowth.startAggregating(params, conf);

Added: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest2.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest2.java?rev=1231700&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest2.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest2.java Sun Jan 15 16:12:14 2012
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import java.io.File;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class PFPGrowthTest2 extends MahoutTestCase {
+  
+  private static final Logger log = LoggerFactory.getLogger(PFPGrowthTest.class);
+  
+  private final Parameters params = new Parameters();
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    params.set(PFPGrowth.MIN_SUPPORT, "3");
+    params.set(PFPGrowth.MAX_HEAPSIZE, "4");
+    params.set(PFPGrowth.NUM_GROUPS, "2");
+    params.set(PFPGrowth.ENCODING, "UTF-8");
+    params.set(PFPGrowth.USE_FPG2, "true");
+    File inputDir = getTestTempDir("transactions");
+    File outputDir = getTestTempDir("frequentpatterns");
+    File input = new File(inputDir, "test.txt");
+    params.set(PFPGrowth.INPUT, input.getAbsolutePath());
+    params.set(PFPGrowth.OUTPUT, outputDir.getAbsolutePath());
+    Writer writer = Files.newWriter(input, Charsets.UTF_8);
+    try {
+      Collection<List<String>> transactions = Lists.newArrayList();
+      transactions.add(Arrays.asList("E", "A", "D", "B"));
+      transactions.add(Arrays.asList("D", "A", "C", "E", "B"));
+      transactions.add(Arrays.asList("C", "A", "B", "E"));
+      transactions.add(Arrays.asList("B", "A", "D"));
+      transactions.add(Arrays.asList("D"));
+      transactions.add(Arrays.asList("D", "B"));
+      transactions.add(Arrays.asList("A", "D", "E"));
+      transactions.add(Arrays.asList("B", "C"));
+      for (List<String> transaction : transactions) {
+        String sep = "";
+        for (String item : transaction) {
+          writer.write(sep + item);
+          sep = ",";
+        }
+        writer.write("\n");
+      }
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+    
+  }
+
+  /**
+   * Test Parallel FPGrowth on small example data using top-level
+   * runPFPGrowth() method
+   */ 
+  @Test
+  public void testStartParallelFPGrowth() throws Exception {
+    Configuration conf = new Configuration();
+    PFPGrowth.runPFPGrowth(params);
+
+    List<Pair<String,TopKStringPatterns>> frequentPatterns = PFPGrowth.readFrequentPattern(params);
+
+    assertEquals("[(A,([A],5), ([D, A],4), ([B, A],4), ([A, E],4)), "
+                 + "(B,([B],6), ([B, D],4), ([B, A],4), ([B, D, A],3)), " 
+                 + "(C,([B, C],3)), "
+                 + "(D,([D],6), ([D, A],4), ([B, D],4), ([D, A, E],3)), "
+                 + "(E,([A, E],4), ([D, A, E],3), ([B, A, E],3))]", frequentPatterns.toString());                                                                 
+  }
+
+  /**
+   * Test Parallel FPG on small example data, running various stages
+   * individually
+   */ 
+  @Test
+  public void testStartParallelFPGrowthInSteps() throws Exception {
+    Configuration conf = new Configuration();
+    log.info("Starting Parallel Counting Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
+    PFPGrowth.startParallelCounting(params, conf);
+    log.info("Reading fList Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
+    List<Pair<String,Long>> fList = PFPGrowth.readFList(params);
+    log.info("{}", fList);
+    assertEquals("[(B,6), (D,6), (A,5), (E,4), (C,3)]", fList.toString());
+ 
+    PFPGrowth.saveFList(fList, params, conf);
+    int numGroups = params.getInt(PFPGrowth.NUM_GROUPS, 
+                                  PFPGrowth.NUM_GROUPS_DEFAULT);
+    int maxPerGroup = fList.size() / numGroups;
+    if (fList.size() % numGroups != 0) 
+      maxPerGroup++;
+    params.set(PFPGrowth.MAX_PER_GROUP, Integer.toString(maxPerGroup));
+
+    log.info("Starting Parallel FPGrowth Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
+    PFPGrowth.startParallelFPGrowth(params, conf);
+    log.info("Starting Pattern Aggregation Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
+    PFPGrowth.startAggregating(params, conf);
+
+    List<Pair<String,TopKStringPatterns>> frequentPatterns = PFPGrowth.readFrequentPattern(params);
+
+    assertEquals("[(A,([A],5), ([D, A],4), ([B, A],4), ([A, E],4)), "
+                 + "(B,([B],6), ([B, D],4), ([B, A],4), ([B, D, A],3)), " 
+                 + "(C,([B, C],3)), "
+                 + "(D,([D],6), ([D, A],4), ([B, D],4), ([D, A, E],3)), "
+                 + "(E,([A, E],4), ([D, A, E],3), ([B, A, E],3))]", frequentPatterns.toString());                                                                 
+  }
+}

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/TransactionTreeTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/TransactionTreeTest.java?rev=1231700&r1=1231699&r2=1231700&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/TransactionTreeTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/TransactionTreeTest.java Sun Jan 15 16:12:14 2012
@@ -21,10 +21,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 
-import com.google.common.collect.Lists;
 import org.apache.mahout.common.MahoutTestCase;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.list.IntArrayList;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -52,8 +52,8 @@ public final class TransactionTreeTest e
     gen = RandomUtils.getRandom();
   }
 
-  private List<Integer> generateRandomArray() {
-    List<Integer> list = Lists.newArrayList();
+  private IntArrayList generateRandomArray() {
+    IntArrayList list = new IntArrayList();
     for (int i = 0; i < MAX_FEATURES; i++) {
       if (gen.nextInt() % SKIP_RATE == 0) {
         list.add(i);
@@ -69,7 +69,7 @@ public final class TransactionTreeTest e
     int nodes = 0;
     int total = 0;
     for (int i = 0; i < MAX_TRANSACTIONS; i++) {
-      List<Integer> array = generateRandomArray();
+      IntArrayList array = generateRandomArray();
       total += array.size();
       nodes += tree.addPattern(array, 1 + gen.nextInt(MAX_DUPLICATION));
     }
@@ -84,9 +84,9 @@ public final class TransactionTreeTest e
     StringBuilder sb = new StringBuilder();
     int count = 0;
     int items = 0;
-    Iterator<Pair<List<Integer>,Long>> it = tree.iterator();
+    Iterator<Pair<IntArrayList,Long>> it = tree.iterator();
     while (it.hasNext()) {
-      Pair<List<Integer>,Long> p = it.next();
+      Pair<IntArrayList,Long> p = it.next();
       vtree.addPattern(p.getFirst(), p.getSecond());
       items += p.getFirst().size();
       count++;



Mime
View raw message