mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject svn commit: r1138664 - in /mahout/trunk/core/src: main/java/org/apache/mahout/fpm/pfpgrowth/ test/java/org/apache/mahout/fpm/pfpgrowth/
Date Wed, 22 Jun 2011 22:29:36 GMT
Author: srowen
Date: Wed Jun 22 22:29:35 2011
New Revision: 1138664

URL: http://svn.apache.org/viewvc?rev=1138664&view=rev
Log:
MAHOUT-632 Robin's fix to use distributed cache

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
    mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Wed Jun
22 22:29:35 2011
@@ -18,33 +18,38 @@
 package org.apache.mahout.fpm.pfpgrowth;
 
 import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.PriorityQueue;
 import java.util.regex.Pattern;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.GenericsUtil;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.Parameters;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
 import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
 import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
 import org.slf4j.Logger;
@@ -70,7 +75,7 @@ public final class PFPGrowth {
   public static final String FILE_PATTERN = "part-*";
   public static final String FPGROWTH = "fpgrowth";
   public static final String FREQUENT_PATTERNS = "frequentpatterns";
-  public static final String PARALLEL_COUNTING = "parallelcounting";  
+  public static final String PARALLEL_COUNTING = "parallelcounting";
   public static final String SORTED_OUTPUT = "sortedoutput";
   public static final String SPLIT_PATTERN = "splitPattern";
 
@@ -78,46 +83,89 @@ public final class PFPGrowth {
   
   private static final Logger log = LoggerFactory.getLogger(PFPGrowth.class);
   
-  private PFPGrowth() { }
+  private PFPGrowth() {}
   
   /**
    * Generates the fList from the serialized string representation
-   *
+   * 
    * @return Deserialized Feature Frequency List
    */
-  public static List<Pair<String,Long>> deserializeList(Parameters params,
-                                                        String key,
-                                                        Configuration conf) throws IOException
{
-    List<Pair<String,Long>> list = Lists.newArrayList();
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
-    
-    DefaultStringifier<List<Pair<String,Long>>> listStringifier = new DefaultStringifier<List<Pair<String,Long>>>(
-        conf, GenericsUtil.getClass(list));
-    String serializedString = params.get(key, listStringifier.toString(list));
-    list = listStringifier.fromString(serializedString);
+  public static List<Pair<String,Long>> readFList(Configuration conf) throws
IOException {
+    List<Pair<String,Long>> list = new ArrayList<Pair<String,Long>>();
+    URI[] files = DistributedCache.getCacheFiles(conf);
+    if (files == null || files.length < 2) {
+      throw new IOException("Cannot read Frequency list and Grouping list from Distributed
Cache");
+    }
+    for (Pair<Text,LongWritable> record :
+         new SequenceFileIterable<Text,LongWritable>(new Path(files[0].getPath()),
true, conf)) {
+      list.add(new Pair<String,Long>(record.getFirst().toString(), record.getSecond().get()));
+    }
     return list;
   }
   
   /**
    * Generates the gList(Group ID Mapping of Various frequent Features) Map from the corresponding
serialized
    * representation
-   *
+   * 
    * @return Deserialized Group List
    */
-  public static Map<String,Long> deserializeMap(Parameters params, String key, Configuration
conf) throws IOException {
-    Map<String,Long> map = Maps.newHashMap();
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
-    
-    DefaultStringifier<Map<String,Long>> mapStringifier = new DefaultStringifier<Map<String,Long>>(conf,
-        GenericsUtil.getClass(map));
-    String gListString = params.get(key, mapStringifier.toString(map));
-    map = mapStringifier.fromString(gListString);
+  public static Map<String,Long> readGList(Configuration conf) throws IOException {
+    Map<String,Long> map = new HashMap<String,Long>();
+    URI[] files = DistributedCache.getCacheFiles(conf);
+    if (files == null || files.length < 2) {
+      throw new IOException("Cannot read Frequency list and Grouping list from Distributed
Cache");
+    }
+    for (Pair<Text,LongWritable> record :
+         new SequenceFileIterable<Text,LongWritable>(new Path(files[1].getPath()),
true, conf)) {
+      map.put(record.getFirst().toString(), record.getSecond().get());
+    }
     return map;
   }
   
   /**
+   * Serializes the fList and returns the string representation of the List
+   * 
+   * @return Serialized String representation of List
+   */
+  private static void saveFList(Iterable<Pair<String,Long>> flist, Parameters
params, Configuration conf)
+    throws IOException {
+    Path flistPath = new Path(params.get(OUTPUT), F_LIST);
+    FileSystem fs = FileSystem.get(conf);
+    flistPath = fs.makeQualified(flistPath);
+    HadoopUtil.delete(conf, flistPath);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath, Text.class,
LongWritable.class);
+    try {
+      for (Pair<String,Long> pair : flist) {
+        writer.append(new Text(pair.getFirst()), new LongWritable(pair.getSecond()));
+      }
+    } finally {
+      writer.close();
+    }
+    DistributedCache.addCacheFile(flistPath.toUri(), conf);
+  }
+  
+  /**
+   * Converts a given Map in to a String using DefaultStringifier of Hadoop
+   * 
+   * @return Serialized String representation of the GList Map
+   */
+  private static void saveGList(Map<String,Long> glist, Parameters params, Configuration
conf) throws IOException {
+    Path flistPath = new Path(params.get(OUTPUT), G_LIST);
+    FileSystem fs = FileSystem.get(conf);
+    flistPath = fs.makeQualified(flistPath);
+    HadoopUtil.delete(conf, flistPath);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath, Text.class,
LongWritable.class);
+    try {
+      for (Entry<String,Long> pair : glist.entrySet()) {
+        writer.append(new Text(pair.getKey()), new LongWritable(pair.getValue()));
+      }
+    } finally {
+      writer.close();
+    }
+    DistributedCache.addCacheFile(flistPath.toUri(), conf);
+  }
+  
+  /**
    * read the feature frequency List which is built at the end of the Parallel counting job
    * 
    * @return Feature Frequency List
@@ -125,14 +173,24 @@ public final class PFPGrowth {
   public static List<Pair<String,Long>> readFList(Parameters params) {
     int minSupport = Integer.valueOf(params.get(MIN_SUPPORT, "3"));
     Configuration conf = new Configuration();
+    
+    Path parallelCountingPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING);
 
-    PriorityQueue<Pair<String,Long>> queue =
-        new PriorityQueue<Pair<String,Long>>(11, new CountDescendingPairComparator<String,Long>());
+    PriorityQueue<Pair<String,Long>> queue = new PriorityQueue<Pair<String,Long>>(11,
+        new Comparator<Pair<String,Long>>() {
+          @Override
+          public int compare(Pair<String,Long> o1, Pair<String,Long> o2) {
+            int ret = o2.getSecond().compareTo(o1.getSecond());
+            if (ret != 0) {
+              return ret;
+            }
+            return o1.getFirst().compareTo(o2.getFirst());
+          }
+        });
 
-    Path parallelCountingPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING);
-    Path filesPattern = new Path(parallelCountingPath, FILE_PATTERN);
-    for (Pair<Writable,LongWritable> record
-         : new SequenceFileDirIterable<Writable,LongWritable>(filesPattern, PathType.GLOB,
null, null, true, conf)) {
+    for (Pair<Text,LongWritable> record :
+         new SequenceFileDirIterable<Text,LongWritable>(new Path(parallelCountingPath,
FILE_PATTERN),
+                                                        PathType.GLOB, null, null, true,
conf)) {
       long value = record.getSecond().get();
       if (value >= minSupport) {
         queue.add(new Pair<String,Long>(record.getFirst().toString(), value));
@@ -153,7 +211,7 @@ public final class PFPGrowth {
   public static List<Pair<String,TopKStringPatterns>> readFrequentPattern(Parameters
params) throws IOException {
     
     Configuration conf = new Configuration();
-
+    
     Path frequentPatternsPath = new Path(params.get(OUTPUT), FREQUENT_PATTERNS);
     FileSystem fs = FileSystem.get(frequentPatternsPath.toUri(), conf);
     FileStatus[] outputFiles = fs.globStatus(new Path(frequentPatternsPath, FILE_PATTERN));
@@ -171,25 +229,26 @@ public final class PFPGrowth {
    *          params should contain input and output locations as a string value, the additional
parameters
    *          include minSupport(3), maxHeapSize(50), numGroups(1000)
    */
-  public static void runPFPGrowth(Parameters params)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    startParallelCounting(params);
-    startGroupingItems(params);
-    startTransactionSorting(params);
-    startParallelFPGrowth(params);
-    startAggregating(params);
+  public static void runPFPGrowth(Parameters params) throws IOException,
+                                                    InterruptedException,
+                                                    ClassNotFoundException {
+    Configuration conf = new Configuration();
+    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
+    startParallelCounting(params, conf);
+    startGroupingItems(params, conf);
+    startTransactionSorting(params, conf);
+    startParallelFPGrowth(params, conf);
+    startAggregating(params, conf);
   }
   
   /**
    * Run the aggregation Job to aggregate the different TopK patterns and group each Pattern
by the features
    * present in it and thus calculate the final Top K frequent Patterns for each feature
    */
-  public static void startAggregating(Parameters params)
+  public static void startAggregating(Parameters params, Configuration conf)
     throws IOException, InterruptedException, ClassNotFoundException {
     
-    Configuration conf = new Configuration();
-    params.set(F_LIST, "");
-    params.set(G_LIST, "");
     conf.set(PFP_PARAMETERS, params.toString());
     conf.set("mapred.compress.map.output", "true");
     conf.set("mapred.output.compression.type", "BLOCK");
@@ -210,7 +269,7 @@ public final class PFPGrowth {
     job.setCombinerClass(AggregatorReducer.class);
     job.setReducerClass(AggregatorReducer.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
+    
     HadoopUtil.delete(conf, outPath);
     job.waitForCompletion(true);
   }
@@ -221,8 +280,7 @@ public final class PFPGrowth {
    * @param params
    * @throws IOException
    */
-  public static void startGroupingItems(Parameters params) throws IOException {
-    Configuration conf = new Configuration();
+  public static void startGroupingItems(Parameters params, Configuration conf) throws IOException
{
     List<Pair<String,Long>> fList = readFList(params);
     Integer numGroups = Integer.valueOf(params.get(NUM_GROUPS, "50"));
     
@@ -247,17 +305,15 @@ public final class PFPGrowth {
     
     log.info("No of Features: {}", fList.size());
     
-    params.set(G_LIST, serializeMap(gList, conf));
-    params.set(F_LIST, serializeList(fList, conf));
+    saveFList(fList, params, conf);
+    saveGList(gList, params, conf);
   }
   
   /**
    * Count the frequencies of various features in parallel using Map/Reduce
    */
-  public static void startParallelCounting(Parameters params)
+  public static void startParallelCounting(Parameters params, Configuration conf)
     throws IOException, InterruptedException, ClassNotFoundException {
-    
-    Configuration conf = new Configuration();
     conf.set(PFP_PARAMETERS, params.toString());
     
     conf.set("mapred.compress.map.output", "true");
@@ -289,12 +345,8 @@ public final class PFPGrowth {
   /**
    * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent
shards
    */
-  public static void startTransactionSorting(Parameters params)
+  public static void startTransactionSorting(Parameters params, Configuration conf)
     throws IOException, InterruptedException, ClassNotFoundException {
-    
-    Configuration conf = new Configuration();
-    String gList = params.get(G_LIST);
-    params.set(G_LIST, "");
     conf.set(PFP_PARAMETERS, params.toString());
     conf.set("mapred.compress.map.output", "true");
     conf.set("mapred.output.compression.type", "BLOCK");
@@ -320,16 +372,13 @@ public final class PFPGrowth {
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     
     job.waitForCompletion(true);
-    params.set(G_LIST, gList);
   }
   
   /**
    * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent
shards
    */
-  public static void startParallelFPGrowth(Parameters params)
+  public static void startParallelFPGrowth(Parameters params, Configuration conf)
     throws IOException, InterruptedException, ClassNotFoundException {
-    
-    Configuration conf = new Configuration();
     conf.set(PFP_PARAMETERS, params.toString());
     conf.set("mapred.compress.map.output", "true");
     conf.set("mapred.output.compression.type", "BLOCK");
@@ -357,30 +406,4 @@ public final class PFPGrowth {
     
     job.waitForCompletion(true);
   }
-  
-  /**
-   * Serializes the fList and returns the string representation of the List
-   *
-   * @return Serialized String representation of List
-   */
-  private static String serializeList(List<Pair<String,Long>> list, Configuration
conf) throws IOException {
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
-    DefaultStringifier<List<Pair<String,Long>>> listStringifier = new DefaultStringifier<List<Pair<String,Long>>>(
-        conf, GenericsUtil.getClass(list));
-    return listStringifier.toString(list);
-  }
-  
-  /**
-   * Converts a given Map in to a String using DefaultStringifier of Hadoop
-   * 
-   * @return Serialized String representation of the GList Map
-   */
-  private static String serializeMap(Map<String,Long> map, Configuration conf) throws
IOException {
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
-    DefaultStringifier<Map<String,Long>> mapStringifier = new DefaultStringifier<Map<String,Long>>(conf,
-        GenericsUtil.getClass(map));
-    return mapStringifier.toString(map);
-  }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
Wed Jun 22 22:29:35 2011
@@ -26,7 +26,6 @@ import java.util.Map.Entry;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.Parameters;
 import org.apache.mahout.math.map.OpenIntLongHashMap;
 import org.apache.mahout.math.map.OpenObjectIntHashMap;
 
@@ -47,9 +46,8 @@ public class ParallelFPGrowthMapper exte
       Integer[] prunedItems = pattern.getFirst().toArray(new Integer[pattern.getFirst().size()]);
 
       Collection<Long> groups = new HashSet<Long>();
-      for (int j = prunedItems.length - 1; j >= 0; j--) { // generate group
-        // dependent
-        // shards
+      for (int j = prunedItems.length - 1; j >= 0; j--) {
+        // generate group dependent shards
         Integer item = prunedItems[j];
         Long groupID = gListInt.get(item);
 
@@ -68,16 +66,13 @@ public class ParallelFPGrowthMapper exte
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
-    Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS,
""));
-    
     OpenObjectIntHashMap<String> fMap = new OpenObjectIntHashMap<String>();
     int i = 0;
-    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, PFPGrowth.F_LIST,
context.getConfiguration())) {
+    for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) {
       fMap.put(e.getFirst(), i++);
     }
     
-    for (Entry<String,Long> e : PFPGrowth.deserializeMap(params, PFPGrowth.G_LIST,
context.getConfiguration())
-        .entrySet()) {
+    for (Entry<String,Long> e : PFPGrowth.readGList(context.getConfiguration()).entrySet())
{
       gListInt.put(fMap.get(e.getKey()), e.getValue());
     }
     

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
Wed Jun 22 22:29:35 2011
@@ -94,13 +94,13 @@ public class ParallelFPGrowthReducer ext
     Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS,
""));
     
     int i = 0;
-    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, PFPGrowth.F_LIST,
context.getConfiguration())) {
+    for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) {
       featureReverseMap.add(e.getFirst());
       fMap.put(e.getFirst(), i++);
       
     }
     
-    Map<String,Long> gList = PFPGrowth.deserializeMap(params, PFPGrowth.G_LIST, context.getConfiguration());
+    Map<String,Long> gList = PFPGrowth.readGList(context.getConfiguration());
     
     for (Entry<String,Long> entry : gList.entrySet()) {
       IntArrayList groupList = groupFeatures.get(entry.getValue());

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
Wed Jun 22 22:29:35 2011
@@ -71,9 +71,9 @@ public class TransactionSortingMapper ex
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
     Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS,
""));
-    
+
     int i = 0;
-    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, PFPGrowth.F_LIST,
context.getConfiguration())) {
+    for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) {
       fMap.put(e.getFirst(), i++);
     }
     

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java
(original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java
Wed Jun 22 22:29:35 2011
@@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
 import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.mahout.common.MahoutTestCase;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.Parameters;
@@ -99,17 +100,17 @@ public class PFPGrowthRetailDataTest ext
       Long support = Long.parseLong(supportString.substring(1, supportString.length() - 1));
       expectedResults.put(new HashSet<String>(items), support);
     }
-    
+    Configuration conf = new Configuration();
     log.info("Starting Parallel Counting Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startParallelCounting(params);
+    PFPGrowth.startParallelCounting(params, conf);
     log.info("Starting Grouping Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startGroupingItems(params);
+    PFPGrowth.startGroupingItems(params, conf);
     log.info("Starting Parallel FPGrowth Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startGroupingItems(params);
-    PFPGrowth.startTransactionSorting(params);
-    PFPGrowth.startParallelFPGrowth(params);
+    PFPGrowth.startGroupingItems(params, conf);
+    PFPGrowth.startTransactionSorting(params, conf);
+    PFPGrowth.startParallelFPGrowth(params, conf);
     log.info("Starting Pattern Aggregation Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startAggregating(params);
+    PFPGrowth.startAggregating(params, conf);
     List<Pair<String,TopKStringPatterns>> frequentPatterns = PFPGrowth.readFrequentPattern(params);
     
     Map<Set<String>,Long> results = Maps.newHashMap();

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java Wed
Jun 22 22:29:35 2011
@@ -22,7 +22,6 @@ import java.io.Writer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
@@ -82,25 +81,23 @@ public final class PFPGrowthTest extends
 
   @Test
   public void testStartParallelFPGrowth() throws Exception {
+    Configuration conf = new Configuration();
     log.info("Starting Parallel Counting Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startParallelCounting(params);
+    PFPGrowth.startParallelCounting(params, conf);
     log.info("Reading fList Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
     List<Pair<String,Long>> fList = PFPGrowth.readFList(params);
     log.info("{}", fList);
     assertEquals("[(B,6), (D,6), (A,5), (E,4), (C,3)]", fList.toString());
  
     log.info("Starting Grouping Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startGroupingItems(params);
-    Map<String,Long> gList = PFPGrowth.deserializeMap(params, PFPGrowth.G_LIST, new
Configuration());
-    log.info("{}", gList);
-    assertEquals("{D=0, E=1, A=0, B=0, C=1}", gList.toString());
+    PFPGrowth.startGroupingItems(params, conf);
  
     log.info("Starting Parallel FPGrowth Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startGroupingItems(params);
-    PFPGrowth.startTransactionSorting(params);
-    PFPGrowth.startParallelFPGrowth(params);
+    PFPGrowth.startGroupingItems(params, conf);
+    PFPGrowth.startTransactionSorting(params, conf);
+    PFPGrowth.startParallelFPGrowth(params, conf);
     log.info("Starting Pattern Aggregation Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
-    PFPGrowth.startAggregating(params);
+    PFPGrowth.startAggregating(params, conf);
     List<Pair<String,TopKStringPatterns>> frequentPatterns = PFPGrowth.readFrequentPattern(params);
     assertEquals("[(A,([A],5), ([D, A],4), ([B, A],4), ([A, E],4)), "
                  + "(B,([B],6), ([B, D],4), ([B, A],4), ([B, D, A],3)), " 



Mime
View raw message