Return-Path: Delivered-To: apmail-lucene-mahout-commits-archive@minotaur.apache.org Received: (qmail 27874 invoked from network); 18 Oct 2009 22:20:23 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 18 Oct 2009 22:20:23 -0000 Received: (qmail 54062 invoked by uid 500); 18 Oct 2009 22:20:23 -0000 Delivered-To: apmail-lucene-mahout-commits-archive@lucene.apache.org Received: (qmail 53979 invoked by uid 500); 18 Oct 2009 22:20:23 -0000 Mailing-List: contact mahout-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mahout-dev@lucene.apache.org Delivered-To: mailing list mahout-commits@lucene.apache.org Received: (qmail 53970 invoked by uid 99); 18 Oct 2009 22:20:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 18 Oct 2009 22:20:23 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 18 Oct 2009 22:20:18 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E5C9F23888FF; Sun, 18 Oct 2009 22:19:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r826539 [1/2] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/common/cache/ core/src/main/java/org/apache/mahout/fpm/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ core/src/... Date: Sun, 18 Oct 2009 22:19:57 -0000 To: mahout-commits@lucene.apache.org From: robinanil@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091018221957.E5C9F23888FF@eris.apache.org> Author: robinanil Date: Sun Oct 18 22:19:56 2009 New Revision: 826539 URL: http://svn.apache.org/viewvc?rev=826539&view=rev Log: MAHOUT-157 Parallel Frequent Pattern Mining using FPGrowth Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPGrowth.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTree.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTreeDepthCache.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FrequentPatternMaxHeap.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/Pattern.java lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/package.html lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest.java lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthJob.java Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * An Ordered List of Integers which can be used in a Hadoop Map/Reduce Job + * + * + */ +public class IntegerTuple implements Writable, WritableComparable { + + private List tuple = new ArrayList(); + + public IntegerTuple() { + } + + public IntegerTuple(Integer firstEntry) { + add(firstEntry); + } + + public IntegerTuple(Collection entries) { + for(Integer entry: entries) + add(entry); + } + + public IntegerTuple(Integer[] entries) { + for(Integer entry: entries) + add(entry); + } + + /** + * add an entry to the end of the list + * + * @param entry + * @return true if the items get added + */ + public boolean add(Integer entry) { + return tuple.add(entry); + } + + /** + * Fetches the string at the given location + * + * @param index + * @return String value at the given location in the tuple list + */ + public Integer integerAt(int index) { + return tuple.get(index); + } + + /** + * Replaces the string at the given index with the given newString + * + * @param index + * @param newString + * @return The previous value at that location + */ + public Integer replaceAt(int index, Integer newInteger) { + return tuple.set(index, newInteger); + } + + /** + * Fetch the list of entries from the tuple + * + * @return a List containing the strings in the order of insertion + */ + public List getEntries() { + return Collections.unmodifiableList(this.tuple); + } + + /** + * Returns the length of the tuple + * + * @return length + */ + public int length() { + return this.tuple.size(); + } + + @Override + public String toString() { + return tuple.toString(); + }; + + @Override + public int hashCode() { + return tuple.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + IntegerTuple other = (IntegerTuple) obj; + if (tuple == null) { + if (other.tuple != null) + return false; + } else if (!tuple.equals(other.tuple)) + return false; + return true; + } + + @Override + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + tuple = new ArrayList(len); + for (int i = 0; i < len; i++) { + int data = in.readInt(); + tuple.add(data); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(tuple.size()); + for (Integer entry : tuple) { + out.writeInt(entry); + } + } + + @Override + public int compareTo(IntegerTuple otherTuple) { + int min = Math.min(this.length(), otherTuple.length()); + for (int i = 0; i < min; i++) { + int ret = this.tuple.get(i).compareTo(otherTuple.integerAt(i)); + if (ret == 0) + continue; + return ret; + } + return this.length() - otherTuple.length(); + } + +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Pattern; + +public class StringRecordIterator implements Iterator> { + + private Iterator lineIterator; + private Pattern splitter = null; + public StringRecordIterator(FileLineIterable iterable, String pattern) + { + this.lineIterator = iterable.iterator(); + this.splitter = Pattern.compile(pattern); + } + @Override + public boolean hasNext() { + return lineIterator.hasNext(); + } + + @Override + public List next() { + String line = lineIterator.next(); + String[] items = splitter.split(line); + return Arrays.asList(items); + } + + @Override + public void remove() { + lineIterator.remove(); + } + +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.cache; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +public class LeastKCache, V> implements + Cache { + + private int capacity = 0; + + private Map cache = null; + + private PriorityQueue queue = null; + + public LeastKCache(final int capacity) { + + this.capacity = capacity; + + cache = new HashMap(capacity); + queue = new PriorityQueue(capacity, new Comparator() { + + @Override + public int compare(K o1, K o2) { + return o2.compareTo(o1); + } + + }); + + } + + @Override + final public long capacity() { + return capacity; + } + + @Override + final public V get(K key) { + return cache.get(key); + } + + @Override + final public void set(K key, V value) { + if (contains(key) == false) { + queue.add(key); + } + cache.put(key, value); + while (queue.size() > capacity) { + K k = queue.poll(); + cache.remove(k); + } + } + + @Override + final public long size() { + return cache.size(); + } + + @Override + final public boolean contains(K key) { + return (cache.containsKey(key)); + } + +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.common.Pair; +import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns; + +import java.io.IOException; +import java.util.*; + +/** + * + * {@link AggregatorMapper} outputs the pattern for each item in the pattern, so + * that reducer can group them and select the top K frequent patterns + * + */ +public class AggregatorMapper extends + Mapper { + + @Override + protected void map(Text key, TopKStringPatterns values, Context context) + throws IOException, InterruptedException { + for (Pair, Long> pattern : values.getPatterns()) { + for (String item : pattern.getFirst()) { + List, Long>> patternSingularList = new ArrayList, Long>>(); + patternSingularList.add(pattern); + context.setStatus("Aggregator Mapper:Grouping Patterns for "+item); + context.write(new Text(item), + new TopKStringPatterns(patternSingularList)); + } + } + + } +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.common.Parameters; +import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns; +import java.io.IOException; +import java.util.*; + +/** + * + * {@link AggregatorReducer} groups all Frequent Patterns containing an item and + * outputs the top K patterns containing that particular item + * + */ +public class AggregatorReducer extends + Reducer { + + private int maxHeapSize = 50; + + @Override + protected void reduce(Text key, Iterable values, + Context context) throws IOException, InterruptedException { + TopKStringPatterns patterns = new TopKStringPatterns(); + Iterator it = values.iterator(); + while (it.hasNext()) { + context.setStatus("Aggregator Reducer: Selecting TopK patterns for: " + key); + patterns = patterns.merge(it.next(), maxHeapSize); + } + context.write(key, patterns); + + } + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + Parameters params = Parameters.fromString(context.getConfiguration().get( + "pfp.parameters", "")); + maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50")); + + } +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DefaultStringifier; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.GenericsUtil; +import org.apache.mahout.common.IntegerTuple; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.Parameters; +import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns; +import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as described in + * the paper http://infolab.stanford.edu/~echang/recsys08-69.pdf + * + */ +public class PFPGrowth { + private static final Logger log = LoggerFactory.getLogger(PFPGrowth.class); + + public static Pattern SPLITTER = Pattern.compile("[ ,\t]*[,|\t][ ,\t]*"); + + /** + * + * @param params params should contain input and output locations as a string + * value, the additional parameters include minSupport(3), + * maxHeapSize(50), numGroups(1000) + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + public static void runPFPGrowth(Parameters params) throws IOException, + InterruptedException, ClassNotFoundException { + startParallelCounting(params); + startGroupingItems(params); + startParallelFPGrowth(params); + startAggregating(params); + } + + /** + * Converts a given Map in to a String using DefaultStringifier of Hadoop + * + * @param map + * @param conf + * @return string representation of the map + * @throws IOException + */ + private static String serializeMap(Map map, Configuration conf) + throws IOException { + conf + .set( + "io.serializations", + "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); + DefaultStringifier> mapStringifier = new DefaultStringifier>( + conf, GenericsUtil.getClass(map)); + String serializedMapString = mapStringifier.toString(map); + return serializedMapString; + } + + /** + * Generates the gList(Group ID Mapping of Various frequent Features) Map from + * the corresponding serialized representation + * + * @param params + * @param key + * @param conf + * @return + * @throws IOException + */ + public static Map deserializeMap(Parameters params, String key, + Configuration conf) throws IOException { + Map map = new HashMap(); + conf + .set( + "io.serializations", + "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); + + DefaultStringifier> mapStringifier = new DefaultStringifier>( + conf, GenericsUtil.getClass(map)); + String gListString = mapStringifier.toString(map); + gListString = params.get(key, gListString); + map = mapStringifier.fromString(gListString); + return map; + } + + /** + * Serializes the fList and returns the string representation of the List + * + * @param list + * @param conf + * @return + * @throws IOException + */ + private static String serializeList(List> list, + Configuration conf) throws IOException { + conf + .set( + "io.serializations", + "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); + DefaultStringifier>> listStringifier = new DefaultStringifier>>( + conf, GenericsUtil.getClass(list)); + String serializedListString = listStringifier.toString(list); + return serializedListString; + } + + /** + * Generates the fList from the serialized string representation + * + * @param params + * @param key + * @param conf + * @return + * @throws IOException + */ + public static List> deserializeList(Parameters params, + String key, Configuration conf) throws IOException { + List> list = new ArrayList>(); + conf + .set( + "io.serializations", + "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); + + DefaultStringifier>> listStringifier = new DefaultStringifier>>( + conf, GenericsUtil.getClass(list)); + String serializedString = listStringifier.toString(list); + serializedString = params.get(key, serializedString); + list = listStringifier.fromString(serializedString); + return list; + } + + /** + * Count the frequencies of various features in parallel using Map/Reduce + * + * @param params + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + public static void startParallelCounting(Parameters params) + throws IOException, InterruptedException, ClassNotFoundException { + + Configuration conf = new Configuration(); + conf.set("pfp.parameters", params.toString()); + + String input = params.get("input"); + Job job = new Job(conf, "Parallel Counting Driver running over input: " + + input); + job.setJarByClass(PFPGrowth.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + + FileInputFormat.addInputPath(job, new Path(input)); + Path outPath = new Path(params.get("output") + "/parallelcounting"); + FileOutputFormat.setOutputPath(job, outPath); + + FileSystem dfs = FileSystem.get(outPath.toUri(), conf); + if (dfs.exists(outPath)) { + dfs.delete(outPath, true); + } + + job.setInputFormatClass(TextInputFormat.class); + job.setMapperClass(ParallelCountingMapper.class); + job.setCombinerClass(ParallelCountingReducer.class); + job.setReducerClass(ParallelCountingReducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + job.waitForCompletion(true); + + } + + /** + * read the feature frequency List which is built at the end of the Parallel + * counting job + * + * @param params + * @return + * @throws IOException + */ + public static List> readFList(Parameters params) + throws IOException { + Writable key = new Text(); + LongWritable value = new LongWritable(); + int minSupport = Integer.valueOf(params.get("minSupport", "3")); + Configuration conf = new Configuration(); + + FileSystem fs = FileSystem.get(new Path(params.get("output") + + "/parallelcounting").toUri(), conf); + FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output") + + "/parallelcounting/part-*")); + + PriorityQueue> queue = new PriorityQueue>( + 11, new Comparator>() { + + @Override + public int compare(Pair o1, Pair o2) { + int ret = o2.getSecond().compareTo(o1.getSecond()); + if (ret != 0) + return ret; + return o1.getFirst().compareTo(o2.getFirst()); + } + + }); + for (FileStatus fileStatus : outputFiles) { + Path path = fileStatus.getPath(); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); + // key is feature value is count + while (reader.next(key, value)) { + if (value.get() < minSupport) + continue; + queue.add(new Pair(key.toString(), value.get())); + } + } + List> fList = new ArrayList>(); + while (queue.isEmpty() == false) + fList.add(queue.poll()); + return fList; + } + + /** + * Read the Frequent Patterns generated from Text + * + * @param params + * @return List of TopK patterns for each string frequent feature + * @throws IOException + */ + public static List> readFrequentPattern( + Parameters params) throws IOException { + + Configuration conf = new Configuration(); + + FileSystem fs = FileSystem.get(new Path(params.get("output") + + "/frequentPatterns").toUri(), conf); + FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output") + + "/frequentPatterns/part-*")); + + List> ret = new ArrayList>(); + for (FileStatus fileStatus : outputFiles) { + Path path = fileStatus.getPath(); + ret.addAll(FPGrowth.readFrequentPattern(fs, conf, path)); + } + return ret; + } + + /** + * Group the given Features into g groups as defined by the numGroups + * parameter in params + * + * @param params + * @throws IOException + */ + public static void startGroupingItems(Parameters params) throws IOException { + Configuration conf = new Configuration(); + List> fList = readFList(params); + Integer numGroups = Integer.valueOf(params.get("numGroups", "50")); + + Map gList = new HashMap(); + long groupID = 0; + long i = 0; + long maxPerGroup = fList.size() / numGroups; + if (fList.size() != maxPerGroup * numGroups) + maxPerGroup = maxPerGroup + 1; + + for (Pair featureFreq : fList) { + String feature = featureFreq.getFirst(); + if (i / (maxPerGroup) == groupID) { + gList.put(feature, groupID); + } else { + groupID++; + gList.put(feature, groupID); + } + i++; + } + + log.info("No of Features: {}", fList.size()); + + params.set("gList", serializeMap(gList, conf)); + params.set("fList", serializeList(fList, conf)); + } + + /** + * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of + * group dependent shards + * + * @param params + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + public static void startParallelFPGrowth(Parameters params) + throws IOException, InterruptedException, ClassNotFoundException { + + Configuration conf = new Configuration(); + conf.set("pfp.parameters", params.toString()); + + String input = params.get("input"); + Job job = new Job(conf, "PFP Growth Driver running over input" + input); + job.setJarByClass(PFPGrowth.class); + + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(IntegerTuple.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(TopKStringPatterns.class); + + FileInputFormat.addInputPath(job, new Path(input)); + Path outPath = new Path(params.get("output") + "/fpgrowth"); + FileOutputFormat.setOutputPath(job, outPath); + + FileSystem dfs = FileSystem.get(outPath.toUri(), conf); + if (dfs.exists(outPath)) { + dfs.delete(outPath, true); + } + + job.setInputFormatClass(TextInputFormat.class); + job.setMapperClass(ParallelFPGrowthMapper.class); + job.setReducerClass(ParallelFPGrowthReducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + job.waitForCompletion(true); + } + + /** + * Run the aggregation Job to aggregate the different TopK patterns and group + * each Pattern by the features present in it and thus calculate the final Top + * K frequent Patterns for each feature + * + * @param params + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + public static void startAggregating(Parameters params) throws IOException, + InterruptedException, ClassNotFoundException { + + Configuration conf = new Configuration(); + params.set("fList", ""); + params.set("gList", ""); + conf.set("pfp.parameters", params.toString()); + + String input = params.get("output") + "/fpgrowth"; + Job job = new Job(conf, "PFP Aggregator Driver running over input: " + + input); + job.setJarByClass(PFPGrowth.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(TopKStringPatterns.class); + + FileInputFormat.addInputPath(job, new Path(input)); + Path outPath = new Path(params.get("output") + "/frequentPatterns"); + FileOutputFormat.setOutputPath(job, outPath); + + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(AggregatorMapper.class); + job.setCombinerClass(AggregatorReducer.class); + job.setReducerClass(AggregatorReducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + FileSystem dfs = FileSystem.get(outPath.toUri(), conf); + if (dfs.exists(outPath)) { + dfs.delete(outPath, true); + } + job.waitForCompletion(true); + } +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.common.Parameters; + +import java.io.IOException; +import java.util.regex.Pattern; + +/** + * + * {@link ParallelCountingMapper} maps all items in a particular transaction + * like the way it is done in Hadoop WordCount example + * + */ +public class ParallelCountingMapper extends + Mapper { + + private static final LongWritable one = new LongWritable(1); + + private Pattern splitter = null; + + @Override + protected void map(LongWritable offset, Text input, Context context) + throws IOException, InterruptedException { + + String[] items = splitter.split(input.toString()); + for (String item : items){ + if(item.trim().length()==0) continue; + context.setStatus("Parallel Counting Mapper: "+ item); + context.write(new Text(item), one); + } + } + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + Parameters params = Parameters.fromString(context.getConfiguration().get( + "pfp.parameters", "")); + splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER + .toString())); + } +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; +import java.util.Iterator; + +/** + * {@link ParallelCountingReducer} sums up the item count and output the item + * and the count This can also be used as a local Combiner. A simple summing + * reducer + */ +public class ParallelCountingReducer extends + Reducer { + + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + long sum = 0; + Iterator it = values.iterator(); + while (it.hasNext()) { + context.setStatus("Parallel Counting Reducer :" + key); + sum += it.next().get(); + } + context.setStatus("Parallel Counting Reducer: " + key + " => " + sum); + context.write(key, new LongWritable(sum)); + + } +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.common.IntegerTuple; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.Parameters; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.regex.Pattern; + +/** + * {@link ParallelFPGrowthMapper} maps each transaction to all unique items + * groups in the transaction. mapper outputs the group id as key and the + * transaction as value + * + */ +public class ParallelFPGrowthMapper extends + Mapper { + + private Map fMap = new HashMap(); + + private Map gListInt = new HashMap(); + + private Pattern splitter = null; + + @Override + protected void map(LongWritable offset, Text input, Context context) + throws IOException, InterruptedException { + + String[] items = splitter.split(input.toString()); + + List itemSet = new ArrayList(); + for (String item : items) // remove items not in the fList + { + if (fMap.containsKey(item) && item.trim().length() != 0) + itemSet.add(fMap.get(item)); + } + + Collections.sort(itemSet); + + Integer[] prunedItems = itemSet.toArray(new Integer[itemSet.size()]); + + Set groups = new HashSet(); + for (int j = prunedItems.length - 1; j >= 0; j--) { // generate group + // dependent + // shards + Integer item = prunedItems[j]; + Long groupID = gListInt.get(item); + if (groups.contains(groupID) == false) { + Integer[] tempItems = new Integer[j + 1]; + System.arraycopy(prunedItems, 0, tempItems, 0, j + 1); + context + .setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: " + + item); + context.write(new LongWritable(groupID), new IntegerTuple(tempItems)); + } + groups.add(groupID); + } + + } + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + Parameters params = Parameters.fromString(context.getConfiguration().get( + "pfp.parameters", "")); + + int i = 0; + for (Pair e : PFPGrowth.deserializeList(params, "fList", + context.getConfiguration())) { + fMap.put(e.getFirst(), i++); + } + + for (Entry e : PFPGrowth.deserializeMap(params, "gList", + context.getConfiguration()).entrySet()) { + gListInt.put(fMap.get(e.getKey()), e.getValue()); + } + splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER + .toString())); + + } +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.common.IntegerTuple; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.Parameters; +import org.apache.mahout.fpm.pfpgrowth.convertors.ContextWriteOutputCollector; +import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerStringOutputConvertor; +import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerTupleIterator; +import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns; +import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth; +import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPTreeDepthCache; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * {@link ParallelFPGrowthReducer} takes each group of transactions and runs + * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns for each + * group. + * + */ + +public class ParallelFPGrowthReducer extends + Reducer { + + private List> fList = new ArrayList>(); + + private List featureReverseMap = new ArrayList(); + + private Map fMap = new HashMap(); + + private Map> groupFeatures = new HashMap>(); + + private int maxHeapSize = 50; + + private int minSupport = 3; + + @Override + public void reduce(LongWritable key, Iterable values, + Context context) throws IOException { + FPGrowth fpGrowth = new FPGrowth(); + fpGrowth + .generateTopKFrequentPatterns( + new IntegerTupleIterator(values.iterator()), + fList, + minSupport, + maxHeapSize, + new HashSet(groupFeatures.get(key.get())), + new IntegerStringOutputConvertor( + new ContextWriteOutputCollector( + context), featureReverseMap)); + } + + @Override + public void setup(Context context) throws IOException, InterruptedException { + + super.setup(context); + Parameters params = Parameters.fromString(context.getConfiguration().get( + "pfp.parameters", "")); + + + + int i = 0; + for(Pair e: PFPGrowth.deserializeList(params, "fList", context + .getConfiguration())) + { + featureReverseMap.add(e.getFirst()); + fMap.put(e.getFirst(), i); + fList.add(new Pair(i++, e.getSecond())); + } + + Map gList = PFPGrowth.deserializeMap(params, "gList", context + .getConfiguration()); + + for (Entry entry : gList.entrySet()) { + List groupList = groupFeatures.get(entry.getValue()); + Integer itemInteger = fMap.get(entry.getKey()); + if (groupList != null) + groupList.add(itemInteger); + else { + groupList = new ArrayList(); + groupList.add(itemInteger); + groupFeatures.put(entry.getValue(), groupList); + } + + } + maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50")); + minSupport = Integer.valueOf(params.get("minSupport", "3")); + FPTreeDepthCache.FirstLevelCacheSize = Integer.valueOf(params + .get("treeCacheSize", Integer + .toString(FPTreeDepthCache.FirstLevelCacheSize))); + } +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.convertors; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapreduce.Reducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ContextWriteOutputCollector + implements OutputCollector { + + private static final Logger log = LoggerFactory + .getLogger(ContextWriteOutputCollector.class); + + private Reducer.Context context; + + public ContextWriteOutputCollector(Reducer.Context context) + throws IOException { + this.context = context; + } + + @Override + public final void collect(K key, V value) throws IOException { + try { + context.setStatus("Writing Top K patterns for: " + key.toString()); + context.write(key, value); + } catch (InterruptedException e) { + log.error("{}", e); + } + } + +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.convertors; + +import java.io.IOException; + +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.OutputCollector; + +public class SequenceFileOutputCollector + implements OutputCollector { + private SequenceFile.Writer writer; + + public SequenceFileOutputCollector(SequenceFile.Writer writer) + throws IOException { + this.writer = writer; + } + + @Override + public final void collect(K key, V value) throws IOException { + writer.append(key, value); + } + +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.convertors; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.mahout.common.Pair; +import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FrequentPatternMaxHeap; +import org.apache.mahout.fpm.pfpgrowth.fpgrowth.Pattern; + +final public class TopKPatternsOutputConvertor implements + OutputCollector { + + private OutputCollector, Long>>> collector = null; + + private Map reverseMapping = null; + + public TopKPatternsOutputConvertor( + OutputCollector, Long>>> collector, + Map reverseMapping) { + this.collector = collector; + this.reverseMapping = reverseMapping; + } + + @Override + final public void collect(Integer key, FrequentPatternMaxHeap value) + throws IOException { + List, Long>> perAttributePatterns = new ArrayList, Long>>(); + for (Pattern itemSet : value.getHeap()) { + List frequentPattern = new ArrayList(); + for (int j = 0; j < itemSet.length(); j++) { + frequentPattern.add(reverseMapping.get(itemSet.getPattern()[j])); + } + Pair, Long> returnItemSet = new Pair, Long>( + frequentPattern, itemSet.getSupport()); + perAttributePatterns.add(returnItemSet); + } + collector.collect(reverseMapping.get(key), perAttributePatterns); + } +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.convertors; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class TransactionIterator implements Iterator { + private Map attributeIdMapping = null; + + private Iterator> iterator = null; + + int[] transactionBuffer = null; + + public TransactionIterator(Iterator> iterator, + Map attributeIdMapping) { + this.attributeIdMapping = attributeIdMapping; + this.iterator = iterator; + transactionBuffer = new int[attributeIdMapping.size()]; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public int[] next() { + List transaction = iterator.next(); + int index = 0; + for (AttributePrimitive Attribute : transaction) { + if (attributeIdMapping.containsKey(Attribute)) { + transactionBuffer[index++] = attributeIdMapping.get(Attribute); + } + } + + int[] transactionList = new int[index]; + System.arraycopy(transactionBuffer, 0, transactionList, 0, index); + return transactionList; + + } + + @Override + public void remove() { + iterator.remove(); + } + +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.convertors.integer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.mahout.common.Pair; +import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns; + +public final class IntegerStringOutputConvertor implements + OutputCollector, Long>>> { + + OutputCollector collector = null; + + List featureReverseMap = null; + + public IntegerStringOutputConvertor( + OutputCollector collector, + List featureReverseMap) { + this.collector = collector; + this.featureReverseMap = featureReverseMap; + } + + @Override + final public void collect(Integer key, List, Long>> value) + throws IOException { + String StringKey = featureReverseMap.get(key); + List, Long>> stringValues = new ArrayList, Long>>(); + for (Pair, Long> e : value) { + List pattern = new ArrayList(); + for (Integer i : e.getFirst()) { + pattern.add(featureReverseMap.get(i)); + } + stringValues.add(new Pair, Long>(pattern, e.getSecond())); + } + + collector + .collect(new Text(StringKey), new TopKStringPatterns(stringValues)); + } + +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.convertors.integer; + +import java.util.Iterator; +import java.util.List; + +import org.apache.mahout.common.IntegerTuple; + +public final class IntegerTupleIterator implements Iterator> { + + private Iterator iterator = null; + + public IntegerTupleIterator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + final public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + final public List next() { + IntegerTuple transaction = iterator.next(); + return transaction.getEntries(); + } + + @Override + final public void remove() { + iterator.remove(); + } + +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.convertors.string; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.mahout.common.Pair; + +public final class StringOutputConvertor implements + OutputCollector, Long>>> { + + OutputCollector collector = null; + + public StringOutputConvertor( + OutputCollector collector) { + this.collector = collector; + } + + @Override + final public void collect(String key, List, Long>> value) + throws IOException { + collector.collect(new Text(key), new TopKStringPatterns(value)); + } + +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.convertors.string; + +import java.util.Iterator; +import java.util.List; + +import org.apache.mahout.common.StringTuple; + +public final class StringTupleIterator implements Iterator> { + + private Iterator iterator = null; + + public StringTupleIterator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + final public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + final public List next() { + StringTuple transaction = iterator.next(); + return transaction.getEntries(); + } + + @Override + final public void remove() { + iterator.remove(); + } + +} Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java?rev=826539&view=auto ============================================================================== --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java (added) +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java Sun Oct 18 22:19:56 2009 @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.convertors.string; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.Pair; + +public final class TopKStringPatterns implements Writable { + private List, Long>> frequentPatterns = null; + + public TopKStringPatterns() { + frequentPatterns = new ArrayList, Long>>(); + } + + public TopKStringPatterns(List, Long>> patterns) { + frequentPatterns = new ArrayList, Long>>(); + frequentPatterns.addAll(patterns); + } + + final public Iterator, Long>> iterator() { + return frequentPatterns.iterator(); + } + + final public List, Long>> getPatterns() { + return frequentPatterns; + } + + final public TopKStringPatterns merge(TopKStringPatterns pattern, int heapSize) { + List, Long>> patterns = new ArrayList, Long>>(); + Iterator, Long>> myIterator = frequentPatterns.iterator(); + Iterator, Long>> otherIterator = pattern.iterator(); + Pair, Long> myItem = null; + Pair, Long> otherItem = null; + for (int i = 0; i < heapSize; i++) { + if (myItem == null && myIterator.hasNext()) + myItem = myIterator.next(); + if (otherItem == null && otherIterator.hasNext()) + otherItem = otherIterator.next(); + if (myItem != null && otherItem != null) { + int cmp = myItem.getSecond().compareTo(otherItem.getSecond()); + if (cmp == 0) { + cmp = myItem.getFirst().size() - otherItem.getFirst().size(); + if (cmp == 0) { + for (int j = 0; j < myItem.getFirst().size(); j++) { + cmp = myItem.getFirst().get(j).compareTo( + otherItem.getFirst().get(j)); + if (cmp == 0) + continue; + else break; + } + } + } + if (cmp <= 0) { + patterns.add(otherItem); + if (cmp == 0) { + myItem = null; + } + otherItem = null; + } else if (cmp > 0) { + patterns.add(myItem); + myItem = null; + } + } else if (myItem != null) { + patterns.add(myItem); + myItem = null; + } else if (otherItem != null) { + patterns.add(otherItem); + otherItem = null; + } else + break; + } + return new TopKStringPatterns(patterns); + } + + @Override + final public void readFields(DataInput in) throws IOException { + frequentPatterns.clear(); + int length = in.readInt(); + for (int i = 0; i < length; i++) { + List items = new ArrayList(); + int itemsetLength = in.readInt(); + long support = in.readLong(); + for (int j = 0; j < itemsetLength; j++) { + int itemLength = in.readInt(); + byte[] data = new byte[itemLength]; + in.readFully(data); + items.add(Bytes.toString(data)); + } + frequentPatterns.add(new Pair, Long>(items, support)); + } + } + + @Override + final public void write(DataOutput out) throws IOException { + out.writeInt(frequentPatterns.size()); + for (Pair, Long> pattern : frequentPatterns) { + out.writeInt(pattern.getFirst().size()); + out.writeLong(pattern.getSecond()); + for (String item : pattern.getFirst()) { + byte[] data = Bytes.toBytes(item); + out.writeInt(data.length); + out.write(data); + } + } + } + + @Override + final public String toString() { + StringBuilder sb = new StringBuilder(); + String sep = ""; + for (Pair, Long> pattern : frequentPatterns) { + sb.append(sep); + sb.append(pattern.toString()); + sep = ", "; + + } + return sb.toString(); + + } +}