mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject svn commit: r893615 - in /lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence: ./ Bigram.java ItemBigramGenerator.java ItemSimilarityEstimator.java TupleWritable.java UserItemJoiner.java UserItemRecommender.java
Date Wed, 23 Dec 2009 19:51:09 GMT
Author: srowen
Date: Wed Dec 23 19:51:09 2009
New Revision: 893615

URL: http://svn.apache.org/viewvc?rev=893615&view=rev
Log:
MAHOUT-103

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/Bigram.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/ItemBigramGenerator.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/ItemSimilarityEstimator.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/UserItemJoiner.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/UserItemRecommender.java

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/Bigram.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/Bigram.java?rev=893615&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/Bigram.java
(added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/Bigram.java
Wed Dec 23 19:51:09 2009
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.cooccurence;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+public final class Bigram implements WritableComparable<Bigram> {
+
+  private int first;
+  private int second;
+
+  public Bigram() {
+    set(-1, -1);
+  }
+
+  public Bigram(Bigram bigram) {
+    set(bigram.first, bigram.second);
+  }
+
+  public Bigram(int first, int second) {
+    set(first, second);
+  }
+
+  public void set(int first, int second) {
+    this.first = first;
+    this.second = second;
+  }
+
+  public int getFirst() {
+    return first;
+  }
+
+  public int getSecond() {
+    return second;
+  }
+
+  /** Read the two integers encoded using variable length encoding */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    first = WritableUtils.readVInt(in);
+    second = WritableUtils.readVInt(in);
+  }
+
+  /** Write the two integers encoded using variable length encoding */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, first);
+    WritableUtils.writeVInt(out, second);
+  }
+
+  @Override
+  public int hashCode() {
+    return first * 157 + second;
+  }
+
+  @Override
+  public boolean equals(Object right) {
+    if (right == null) {
+      return false;
+    }
+    if (right instanceof Bigram) {
+      Bigram r = (Bigram) right;
+      return r.getFirst() == first && r.second == second;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int compareTo(Bigram o) {
+    if (o == null) {
+      return 1;
+    }
+    int ret = first - o.first;
+    if (ret == 0) {
+      ret = second - o.second;
+    }
+    return ret;
+  }
+
+  @Override
+  public String toString() {
+    return first + "\t" + second;
+  }
+
+  /** A Comparator that compares serialized Bigrams. */
+  public static class Comparator extends WritableComparator implements Serializable {
+
+    public Comparator() {
+      super(Bigram.class);
+    }
+
+    /** Compare varibale length encoded numbers * */
+    @Override
+    public int compare(byte[] b1, int s1, int l1,
+                       byte[] b2, int s2, int l2) {
+      int ret = -1;
+      try {
+        int firstb1 = WritableComparator.readVInt(b1, s1);
+        int firstb2 = WritableComparator.readVInt(b2, s2);
+        ret = firstb1 - firstb2;
+        if (ret == 0) {
+          int secondb1 = WritableComparator.readVInt(b1, s1 + WritableUtils.decodeVIntSize(b1[s1]));
+          int secondb2 = WritableComparator.readVInt(b2, s2 + WritableUtils.decodeVIntSize(b2[s2]));
+          ret = secondb1 - secondb2;
+        }
+      } catch (IOException ioe) {
+        throw new IllegalArgumentException(ioe);
+      }
+      return ret;
+    }
+  }
+
+  static {                                        // register this comparator
+    WritableComparator.define(Bigram.class, new Comparator());
+  }
+
+  /** Compare only the first part of the bigram, so that reduce is called once for each value
of the first part. */
+  public static class FirstGroupingComparator
+      extends WritableComparator implements Serializable {
+
+    public FirstGroupingComparator() {
+      super(Bigram.class);
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      int ret;
+      try {
+        int firstb1 = WritableComparator.readVInt(b1, s1);
+        int firstb2 = WritableComparator.readVInt(b2, s2);
+        ret = firstb1 - firstb2;
+      } catch (IOException ioe) {
+        throw new IllegalArgumentException(ioe);
+      }
+      return ret;
+    }
+
+    @Override
+    public int compare(Object o1, Object o2) {
+      if (o1 == null) {
+        return -1;
+      } else if (o2 == null) {
+        return 1;
+      } else {
+        int firstb1 = ((Bigram) o1).getFirst();
+        int firstb2 = ((Bigram) o2).getFirst();
+        return firstb1 - firstb2;
+      }
+    }
+
+  }
+
+  /** A wrapper class that associates pairs with frequency (Occurences) */
+  public static class Frequency implements Comparable<Frequency> {
+
+    private Bigram bigram = new Bigram();
+    private double frequency = 0.0;
+
+    public double getFrequency() {
+      return frequency;
+    }
+
+    public Bigram getBigram() {
+      return bigram;
+    }
+
+    public Frequency(Bigram bigram, double frequency) {
+      this.bigram = new Bigram(bigram);
+      this.frequency = frequency;
+    }
+
+    @Override
+    public int hashCode() {
+      return bigram.hashCode() + (int) Math.abs(Math.round(frequency * 31));
+    }
+
+    @Override
+    public boolean equals(Object right) {
+      if (right == null || !(right instanceof Frequency)) {
+        return false;
+      }
+      Frequency that = (Frequency) right;
+      return this.compareTo(that) == 0;
+    }
+
+    @Override
+    public int compareTo(Frequency that) {
+      return this.frequency > that.frequency ? 1 : -1;
+    }
+
+    @Override
+    public String toString() {
+      return bigram + "\t" + frequency;
+    }
+  }
+}
\ No newline at end of file

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/ItemBigramGenerator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/ItemBigramGenerator.java?rev=893615&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/ItemBigramGenerator.java
(added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/ItemBigramGenerator.java
Wed Dec 23 19:51:09 2009
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.cf.taste.hadoop.cooccurence;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+
+public final class ItemBigramGenerator extends Configured implements Tool {
+
+  public static class UserItemMapper extends MapReduceBase
+      implements Mapper<LongWritable, Text, VIntWritable, VIntWritable> {
+
+    private static final Logger log = LoggerFactory.getLogger(UserItemMapper.class);
+
+    private final VIntWritable user = new VIntWritable(0);
+    private final VIntWritable item = new VIntWritable(0);
+
+    private String fieldSeparator;
+
+    enum Records {
+      INVALID_IDS,
+      INVALID_SCHEMA
+    }
+
+    @Override
+    public void configure(JobConf conf) {
+      fieldSeparator = conf.get("user.preference.field.separator", "\t");
+    }
+
+    @Override
+    public void map(LongWritable lineNumber, Text userPrefEntry, OutputCollector<VIntWritable,
VIntWritable> output,
+                    Reporter reporter)
+        throws IOException {
+      String userPrefLine = userPrefEntry.toString();
+      String[] prefFields = userPrefLine.split(fieldSeparator);
+      if (prefFields.length > 1) {
+        try {
+          int userId = Integer.parseInt(prefFields[0]);
+          int itemId = Integer.parseInt(prefFields[1]);
+          user.set(userId);
+          item.set(itemId);
+          output.collect(user, item);
+        } catch (NumberFormatException nfe) {
+          reporter.incrCounter(Records.INVALID_IDS, 1);
+          log.warn("Invalid IDs in record: {}", userPrefLine);
+        } catch (IllegalArgumentException iae) {
+          reporter.incrCounter(Records.INVALID_IDS, 1);
+          log.warn("Invalid IDs in record: {}", userPrefLine);
+        }
+      } else {
+        reporter.incrCounter(Records.INVALID_SCHEMA, 1);
+        log.warn("No preference found in record: {}", userPrefLine);
+      }
+    }
+  }
+
+  public static class UserItemReducer extends MapReduceBase
+      implements Reducer<VIntWritable, VIntWritable, VIntWritable, VIntWritable> {
+
+    enum User {
+      TOO_FEW_ITEMS,
+      TOO_MANY_ITEMS
+    }
+
+    @Override
+    public void reduce(VIntWritable user, Iterator<VIntWritable> itemIterator,
+                       OutputCollector<VIntWritable, VIntWritable> output, Reporter
reporter)
+        throws IOException {
+      Collection<VIntWritable> itemSet = new HashSet<VIntWritable>();
+      while (itemIterator.hasNext()) {
+        itemSet.add(new VIntWritable(itemIterator.next().get()));
+      }
+
+      if (itemSet.size() <= 2) {
+        reporter.incrCounter(User.TOO_FEW_ITEMS, 1);
+        return;
+      }
+
+      if (itemSet.size() >= 10000) {
+        reporter.incrCounter(User.TOO_MANY_ITEMS, 1);
+        return;
+      }
+
+      VIntWritable[] items = itemSet.toArray(new VIntWritable[itemSet.size()]);
+
+      for (int i = 0; i < items.length; i++) {
+        for (int j = i + 1; j < items.length; j++) {
+          if (i != j) {
+            output.collect(items[i], items[j]);
+            output.collect(items[j], items[i]);
+          }
+        }
+      }
+    }
+
+  }
+
+  public JobConf prepareJob(String inputPaths, Path outputPath, int reducers) {
+    JobConf job = new JobConf(getConf());
+    job.setJarByClass(this.getClass());
+    job.setJobName("Item Bigram Generator");
+
+    job.setMapperClass(UserItemMapper.class);
+    job.setReducerClass(UserItemReducer.class);
+
+    job.setOutputKeyClass(VIntWritable.class);
+    job.setOutputValueClass(VIntWritable.class);
+
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    FileOutputFormat.setCompressOutput(job, true);
+    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+                                                      SequenceFile.CompressionType.BLOCK);
+
+    job.setNumReduceTasks(reducers);
+
+    FileInputFormat.addInputPaths(job, inputPaths);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    return job;
+  }
+
+  @Override
+  public int run(String[] args) throws IOException {
+    // TODO use Commons CLI 2    
+    if (args.length < 2) {
+      System.out.println("Usage: ItemBigramGemerator <input-dir> <output-dir>
[reducers]");
+      ToolRunner.printGenericCommandUsage(System.out);
+      return -1;
+    }
+
+    String inputPaths = args[0];
+    Path outputPath = new Path(args[1]);
+    int reducers = args.length > 2 ? Integer.parseInt(args[2]) : 1;
+    JobConf jobConf = prepareJob(inputPaths, outputPath, reducers);
+    RunningJob job = JobClient.runJob(jobConf);
+    return 0;
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/ItemSimilarityEstimator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/ItemSimilarityEstimator.java?rev=893615&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/ItemSimilarityEstimator.java
(added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/ItemSimilarityEstimator.java
Wed Dec 23 19:51:09 2009
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.cooccurence;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+
+/**
+ * This class feeds into all the item bigrams generated with ItemBigramGenerator. The input
is partitioned on the first
+ * item of the bigram, distributed and sorted by the map-reduce framework and grouped on
first item of the bigram so
+ * that each reducer sees all the bigrams for each unique first item.
+ */
+public final class ItemSimilarityEstimator extends Configured implements Tool {
+
+  /** Partition based on the first part of the bigram. */
+  public static class FirstPartitioner implements Partitioner<Bigram, Writable> {
+
+    @Override
+    public int getPartition(Bigram key, Writable value,
+                            int numPartitions) {
+      return Math.abs(key.getFirst() % numPartitions);
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      // Nothing to do here      
+    }
+  }
+
+  /** Output K -> (item1, item2), V -> ONE */
+  public static class ItemItemMapper extends MapReduceBase
+      implements Mapper<VIntWritable, VIntWritable, Bigram, Bigram> {
+
+    private final Bigram keyBigram = new Bigram();
+    private final Bigram valueBigram = new Bigram();
+    private static final int ONE = 1;
+
+    @Override
+    public void map(VIntWritable item1, VIntWritable item2, OutputCollector<Bigram, Bigram>
output, Reporter reporter)
+        throws IOException {
+      keyBigram.set(item1.get(), item2.get());
+      valueBigram.set(item2.get(), ONE);
+      output.collect(keyBigram, valueBigram);
+    }
+  }
+
+
+  /* Test waters */
+
+  public static class ItemItemCombiner extends MapReduceBase implements Reducer<Bigram,
Bigram, Bigram, Bigram> {
+
+    @Override
+    public void reduce(Bigram item, Iterator<Bigram> similarItemItr,
+                       OutputCollector<Bigram, Bigram> output, Reporter reporter)
+        throws IOException {
+      int count = 0;
+      while (similarItemItr.hasNext()) {
+        Bigram candItem = similarItemItr.next();
+        count += candItem.getSecond();
+      }
+      Bigram similarItem = new Bigram(item.getSecond(), count);
+      output.collect(item, similarItem);
+    }
+  }
+
+  /** All sorted bigrams for item1 are recieved in reduce. <p/> K -> (item1, item2),
V -> (FREQ) */
+  public static class ItemItemReducer extends MapReduceBase implements Reducer<Bigram,
Bigram, Bigram, DoubleWritable> {
+
+    private PriorityQueue<Bigram.Frequency> freqBigrams = new PriorityQueue<Bigram.Frequency>();
+    private Bigram key = new Bigram();
+    private DoubleWritable value = new DoubleWritable();
+
+    private long maxFrequentItems;
+
+    @Override
+    public void configure(JobConf conf) {
+      maxFrequentItems = conf.
+          getLong("max.frequent.items", 20);
+    }
+
+    @Override
+    public void reduce(Bigram item, Iterator<Bigram> simItemItr,
+                       OutputCollector<Bigram, DoubleWritable> output, Reporter reporter)
+        throws IOException {
+      int itemId = item.getFirst();
+      int prevItemId = item.getSecond();
+      int prevCount = 0;
+      while (simItemItr.hasNext()) {
+        Bigram curItem = simItemItr.next();
+        int curItemId = curItem.getFirst();
+        int curCount = curItem.getSecond();
+        if (prevItemId == curItemId) {
+          prevCount += curCount;
+        } else {
+          enqueue(itemId, prevItemId, prevCount);
+          prevItemId = curItemId;
+          prevCount = curCount;
+        }
+      }
+      enqueue(itemId, prevItemId, prevCount);
+      dequeueAll(output);
+    }
+
+    private void enqueue(int first, int second, int count) {
+      Bigram freqBigram = new Bigram(first, second);
+      freqBigrams.add(new Bigram.Frequency(freqBigram, count));
+      if (freqBigrams.size() > maxFrequentItems) {
+        freqBigrams.poll();
+      }
+    }
+
+    private void dequeueAll(OutputCollector<Bigram, DoubleWritable> output) throws
IOException {
+      double totalScore = 0;
+      for (Bigram.Frequency freqBigram : freqBigrams) {
+        totalScore += freqBigram.getFrequency();
+      }
+      // normalize the co-occurrence based counts.
+      for (Bigram.Frequency freqBigram : freqBigrams) {
+        key = freqBigram.getBigram();
+        value = new DoubleWritable(freqBigram.getFrequency() / totalScore);
+        output.collect(key, value);
+      }
+      freqBigrams.clear();
+    }
+  }
+
+
+  public JobConf prepareJob(String inputPaths, Path outputPath, int maxFreqItems, int reducers)
throws IOException {
+    JobConf job = new JobConf(getConf());
+    job.setJobName("Item Bigram Counter");
+    job.setJarByClass(this.getClass());
+
+    job.setMapperClass(ItemItemMapper.class);
+    job.setCombinerClass(ItemItemCombiner.class);
+    job.setReducerClass(ItemItemReducer.class);
+
+    job.setMapOutputKeyClass(Bigram.class);
+    job.setMapOutputValueClass(Bigram.class);
+    job.setOutputKeyClass(Bigram.class);
+    job.setOutputValueClass(DoubleWritable.class);
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    FileOutputFormat.setCompressOutput(job, true);
+    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+                                                      SequenceFile.CompressionType.BLOCK);
+
+    job.setPartitionerClass(FirstPartitioner.class);
+    job.setOutputValueGroupingComparator(Bigram.FirstGroupingComparator.class);
+
+    job.setInt("max.frequent.items", maxFreqItems);
+    job.setNumReduceTasks(reducers);
+    FileInputFormat.addInputPaths(job, inputPaths);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    return job;
+  }
+
+  @Override
+  public int run(String[] args) throws IOException {
+    // TODO use Commons CLI 2
+    if (args.length < 2) {
+      System.out
+          .println("ItemSimilarityEstimator <input-dirs> <output-dir> "
+              + "[max-frequent-items] [reducers]");
+      ToolRunner.printGenericCommandUsage(System.out);
+      return -1;
+    }
+
+    String inputPaths = args[0];
+    Path outputPath = new Path(args[1]);
+    int maxFreqItems = args.length > 2 ? Integer.parseInt(args[2]) : 20;
+    int reducers = args.length > 3 ? Integer.parseInt(args[3]) : 1;
+    JobConf jobConf = prepareJob(inputPaths, outputPath, maxFreqItems, reducers);
+    RunningJob job = JobClient.runJob(jobConf);
+    return 0;
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java?rev=893615&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java
(added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java
Wed Dec 23 19:51:09 2009
@@ -0,0 +1,129 @@
+package org.apache.mahout.cf.taste.hadoop.cooccurence;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.GenericWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+
+public final class TupleWritable extends ArrayWritable {
+
+  public static class Field extends GenericWritable {
+
+    private static final Class<?>[] CLASSES = {
+        VIntWritable.class,
+        VLongWritable.class,
+        DoubleWritable.class,
+        Text.class
+    };
+
+    @Override
+    protected Class<? extends Writable>[] getTypes() {
+      return (Class<? extends Writable>[]) CLASSES;
+    }
+
+    public Field() {
+    }
+
+    public Field(Writable writable) {
+      super.set(writable);
+    }
+  }
+
+  public TupleWritable() {
+    super(Field.class);
+  }
+
+  public TupleWritable(int size) {
+    this();
+    super.set(new Writable[size]);
+  }
+
+  public TupleWritable(Writable... writables) {
+    this();
+    Writable[] fields = new Writable[writables.length];
+    int i = 0;
+    for (Writable writable : writables) {
+      fields[i++] = new Field(writable);
+    }
+    super.set(fields);
+  }
+
+
+  private boolean valid(int idx) {
+    Writable[] fields = get();
+    int length = (fields == null) ? 0 : fields.length;
+    return (idx >= 0 && idx < length);
+  }
+
+
+  private void allocateCapacity() {
+    Writable[] oldFields = get();
+    int oldCapacity = oldFields == null ? 0 : oldFields.length;
+    int newCapacity = (oldCapacity + 1) << 1;
+    Writable[] newFields = new Writable[newCapacity];
+    if (oldFields != null && oldCapacity > 0) {
+      System.arraycopy(oldFields, 0, newFields, 0, oldFields.length);
+    }
+    set(newFields);
+  }
+
+  public void set(int idx, Writable field) {
+    if (!valid(idx)) {
+      allocateCapacity();
+    }
+    Writable[] fields = get();
+    fields[idx] = new Field(field);
+  }
+
+
+  public Field get(int idx) {
+    if (!valid(idx)) {
+      throw new IllegalArgumentException("Invalid index: " + idx);
+    }
+    return (Field) (get()[idx]);
+  }
+
+  public int getInt(int idx) {
+    Field field = get(idx);
+    Class<? extends Writable> wrappedClass = field.get().getClass();
+    if (wrappedClass.equals(VIntWritable.class)) {
+      return ((VIntWritable) field.get()).get();
+    } else {
+      throw new IllegalArgumentException("Not an integer: " + wrappedClass);
+    }
+  }
+
+  public long getLong(int idx) {
+    Field field = get(idx);
+    Class<? extends Writable> wrappedClass = field.get().getClass();
+    if (wrappedClass.equals(VLongWritable.class)) {
+      return ((VLongWritable) field.get()).get();
+    } else {
+      throw new IllegalArgumentException("Not a long: " + wrappedClass);
+    }
+  }
+
+  public double getDouble(int idx) {
+    Field field = get(idx);
+    Class<? extends Writable> wrappedClass = field.get().getClass();
+    if (wrappedClass.equals(DoubleWritable.class)) {
+      return ((DoubleWritable) field.get()).get();
+    } else {
+      throw new IllegalArgumentException("Not an double: " + wrappedClass);
+    }
+  }
+
+  public Text getText(int idx) {
+    Field field = get(idx);
+    Class<? extends Writable> wrappedClass = field.get().getClass();
+    if (wrappedClass.equals(Text.class)) {
+      return (Text) field.get();
+    } else {
+      throw new IllegalArgumentException("Not an double: " + wrappedClass);
+    }
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/UserItemJoiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/UserItemJoiner.java?rev=893615&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/UserItemJoiner.java
(added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/UserItemJoiner.java
Wed Dec 23 19:51:09 2009
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.cf.taste.hadoop.cooccurence;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.lib.MultipleInputs;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+public final class UserItemJoiner extends Configured implements Tool {
+
+  public static class JoinUserMapper extends MapReduceBase
+      implements Mapper<LongWritable, Text, Bigram, TupleWritable> {
+
+    private static final Logger log = LoggerFactory.getLogger(JoinUserMapper.class);
+
+    private final Bigram joinKey = new Bigram();
+    private final TupleWritable tuple = new TupleWritable(2);
+    private final VIntWritable user = new VIntWritable();
+
+    private final VIntWritable keyID = new VIntWritable(1);
+    private String fieldSeparator;
+
+    @Override
+    public void configure(JobConf conf) {
+      fieldSeparator = conf.get("user.preference.field.separator", "\t");
+    }
+
+    @Override
+    public void map(LongWritable lineNumber, Text userPrefEntry, OutputCollector<Bigram,
TupleWritable> output,
+                    Reporter reporter) throws IOException {
+      String userPrefLine = userPrefEntry.toString();
+      String[] prefFields = userPrefLine.split(fieldSeparator);
+      if (prefFields.length > 1) {
+        int userId = Integer.parseInt(prefFields[0]);
+        int itemId = Integer.parseInt(prefFields[1]);
+        user.set(userId);
+        tuple.set(0, keyID);
+        tuple.set(1, user);
+        joinKey.set(itemId, keyID.get());
+        output.collect(joinKey, tuple);
+      } else {
+        log.warn("No preference found in record: {}", userPrefLine);
+      }
+    }
+  }
+
+  public static class JoinItemMapper extends MapReduceBase
+      implements Mapper<Bigram, DoubleWritable, Bigram, TupleWritable> {
+
+    private final VIntWritable simItem = new VIntWritable();
+    private final TupleWritable tuple = new TupleWritable(3);
+    private final Bigram joinKey = new Bigram();
+
+    private final VIntWritable keyID = new VIntWritable(0);
+
+    @Override
+    public void map(Bigram itemBigram, DoubleWritable simScore, OutputCollector<Bigram,
TupleWritable> output,
+                    Reporter reporter) throws IOException {
+      joinKey.set(itemBigram.getFirst(), keyID.get());
+      simItem.set(itemBigram.getSecond());
+      tuple.set(0, keyID);
+      tuple.set(1, simItem);
+      tuple.set(2, simScore);
+      output.collect(joinKey, tuple);
+    }
+  }
+
+  public static class JoinItemUserReducer extends MapReduceBase
+      implements Reducer<Bigram, TupleWritable, VIntWritable, TupleWritable> {
+
+    private final Collection<TupleWritable> cachedSimilarItems = new ArrayList<TupleWritable>();
+
+    private final VIntWritable user = new VIntWritable();
+
+    @Override
+    public void reduce(Bigram itemBigram, Iterator<TupleWritable> tuples,
+                       OutputCollector<VIntWritable, TupleWritable> output, Reporter
reporter) throws IOException {
+      int seenItem = itemBigram.getFirst();
+      while (tuples.hasNext()) {
+        TupleWritable curTuple = tuples.next();
+        int curKeyId = curTuple.getInt(0);
+        if (curKeyId == 0) {
+          TupleWritable cachedTuple = new TupleWritable(3);
+          int simItem = curTuple.getInt(1);
+          double score = curTuple.getDouble(2);
+          cachedTuple.set(0, new VIntWritable(seenItem));
+          cachedTuple.set(1, new VIntWritable(simItem));
+          cachedTuple.set(2, new DoubleWritable(score));
+          cachedSimilarItems.add(cachedTuple);
+        } else {
+          // Encountered tuple from the 'user' relation (ID=1), Do the JOIN
+          int userId = curTuple.getInt(1);
+          user.set(userId);
+          for (TupleWritable candItem : cachedSimilarItems) {
+            output.collect(user, candItem);
+            //  System.out.println("User:" + user + "\tSeen:" + candItem.getInt(0) +
+            //     "\tSimilar:" + candItem.getInt(1) + "\tScore:" + candItem.getDouble(2));
+          }
+        }
+      }
+      cachedSimilarItems.clear();
+    }
+
+  }
+
+  public JobConf prepareJob(Path userInputPath, Path itemInputPath, Path outputPath, int
reducers) throws IOException {
+    JobConf job = new JobConf(getConf());
+    job.setJobName("User Item Joiner");
+    job.setJarByClass(this.getClass());
+
+    MultipleInputs.addInputPath(job, userInputPath, TextInputFormat.class, JoinUserMapper.class);
+    MultipleInputs.addInputPath(job, itemInputPath, SequenceFileInputFormat.class, JoinItemMapper.class);
+    job.setReducerClass(JoinItemUserReducer.class);
+
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.setMapOutputKeyClass(Bigram.class);
+    job.setMapOutputValueClass(TupleWritable.class);
+    job.setOutputKeyClass(VIntWritable.class);
+    job.setOutputValueClass(TupleWritable.class);
+
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    FileOutputFormat.setCompressOutput(job, true);
+    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+                                                      SequenceFile.CompressionType.BLOCK);
+
+    job.setPartitionerClass(ItemSimilarityEstimator.FirstPartitioner.class);
+    job.setOutputValueGroupingComparator(Bigram.FirstGroupingComparator.class);
+
+    job.setNumReduceTasks(reducers);
+
+    return job;
+  }
+
+  @Override
+  public int run(String[] args) throws IOException {
+    // TODO use Commons CLI 2
+    if (args.length < 3) {
+      System.out
+          .println("UserItemJoiner <user-input-dirs> <item-input-dir> <output-dir>
"
+              + "[reducers]");
+      ToolRunner.printGenericCommandUsage(System.out);
+      return -1;
+    }
+
+    Path userInputPath = new Path(args[0]);
+    Path itemInputPath = new Path(args[1]);
+    Path outputPath = new Path(args[2]);
+    int reducers = args.length > 3 ? Integer.parseInt(args[3]) : 1;
+    JobConf jobConf = prepareJob(userInputPath, itemInputPath, outputPath, reducers);
+    RunningJob job = JobClient.runJob(jobConf);
+    return 0;
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/UserItemRecommender.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/UserItemRecommender.java?rev=893615&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/UserItemRecommender.java
(added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/UserItemRecommender.java
Wed Dec 23 19:51:09 2009
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.cf.taste.hadoop.cooccurence;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+public final class UserItemRecommender extends Configured implements Tool {
+
+  public static class RecommenderMapper extends MapReduceBase
+      implements Mapper<VIntWritable, TupleWritable, Bigram, TupleWritable> {
+
+    private final Bigram userSeenItem = new Bigram();
+
+    @Override
+    public void map(VIntWritable user, TupleWritable tuple, OutputCollector<Bigram, TupleWritable>
output,
+                    Reporter reporter) throws IOException {
+      int userId = user.get();
+      int seenItemId = tuple.getInt(0);
+      userSeenItem.set(userId, seenItemId);
+      output.collect(userSeenItem, tuple);
+    }
+  }
+
+  public static class RecommenderReducer extends MapReduceBase implements Reducer<Bigram,
TupleWritable, Text, Text> {
+
+    private String fieldSeparator;
+    private int maxRecommendations;
+
+    private final List<Integer> seenItems = new ArrayList<Integer>();
+    private final Map<Integer, Double> recommendations = new HashMap<Integer, Double>();
+
+    private final Text user = new Text();
+    private final Text recomScore = new Text();
+
+    private static class EntryValueComparator implements Comparator<Map.Entry<Integer,
Double>>, Serializable {
+
+      @Override
+      public int compare(Map.Entry<Integer, Double> itemScore1, Map.Entry<Integer,
Double> itemScore2) {
+        Double value1 = itemScore1.getValue();
+        double val1 = (value1 == null) ? 0 : value1;
+        Double value2 = itemScore2.getValue();
+        double val2 = (value2 == null) ? 0 : value2;
+        return val2 > val1 ? 1 : -1;
+      }
+    }
+
+    @Override
+    public void configure(JobConf conf) {
+      fieldSeparator = conf.get("user.preference.field.separator", "\t");
+      maxRecommendations = conf.getInt("user.preference.max.recommendations", 100);
+    }
+
+    @Override
+    public void reduce(Bigram userSeenItem, Iterator<TupleWritable> candTupleItr, OutputCollector<Text,
Text> output,
+                       Reporter reporter)
+        throws IOException {
+      int userId = userSeenItem.getFirst();
+      int prevSeenItem = userSeenItem.getSecond();
+
+      while (candTupleItr.hasNext()) {
+        TupleWritable tuple = candTupleItr.next();
+        int curSeenItem = tuple.getInt(0);
+        if (curSeenItem != prevSeenItem) {
+          seenItems.add(prevSeenItem);
+          recommendations.remove(prevSeenItem);
+          prevSeenItem = curSeenItem;
+        }
+        int candItem = tuple.getInt(1);
+        double score = tuple.getDouble(2);
+        if (Collections.binarySearch(seenItems, candItem) < 0) {
+          score = recommendations.containsKey(candItem) ? score + recommendations.get(candItem)
: score;
+          recommendations.put(candItem, score);
+        } else {
+          recommendations.remove(candItem);
+        }
+      }
+      recommendations.remove(prevSeenItem);
+      //Sort recommendations by count and output top-N
+      outputSorted(userId, recommendations.entrySet(), output);
+    }
+
+    public void outputSorted(int userId, Collection<Map.Entry<Integer, Double>>
recomSet, OutputCollector<Text, Text> output)
+        throws IOException {
+      user.set(String.valueOf(userId));
+      int N = maxRecommendations;
+      Collection<Map.Entry<Integer, Double>> sortedRecoms =
+          new TreeSet<Map.Entry<Integer, Double>>(new EntryValueComparator());
+      sortedRecoms.addAll(recomSet);
+      for (Map.Entry<Integer, Double> recommendation : sortedRecoms) {
+        recomScore.set(recommendation.getKey() + fieldSeparator + recommendation.getValue());
+        output.collect(user, recomScore);
+        N--;
+        if (N <= 0) {
+          break;
+        }
+      }
+      seenItems.clear();
+      recommendations.clear();
+    }
+  }
+
+  public JobConf prepareJob(String inputPaths, Path outputPath, int maxRecommendations, int
reducers)
+      throws IOException {
+    JobConf job = new JobConf(getConf());
+    job.setJobName("User Item Recommendations");
+    job.setJarByClass(this.getClass());
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setMapperClass(RecommenderMapper.class);
+    job.setReducerClass(RecommenderReducer.class);
+
+    job.setMapOutputKeyClass(Bigram.class);
+    job.setMapOutputValueClass(TupleWritable.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+
+    job.setPartitionerClass(ItemSimilarityEstimator.FirstPartitioner.class);
+    job.setOutputValueGroupingComparator(Bigram.FirstGroupingComparator.class);
+
+    job.setInt("user.preference.max.recommendations", maxRecommendations);
+    job.setNumReduceTasks(reducers);
+    FileInputFormat.addInputPaths(job, inputPaths);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    return job;
+  }
+
+  @Override
+  public int run(String[] args) throws IOException {
+    // TODO use Commons CLI 2
+    if (args.length < 2) {
+      System.out
+          .println("UserItemRecommender <input-dirs> <output-dir> "
+              + "[max-recommendations] [reducers]");
+      ToolRunner.printGenericCommandUsage(System.out);
+      return -1;
+    }
+
+    String inputPaths = args[0];
+    Path outputPath = new Path(args[1]);
+    int maxRecommendations = args.length > 2 ? Integer.parseInt(args[2]) : 100;
+    int reducers = args.length > 3 ? Integer.parseInt(args[3]) : 1;
+    JobConf jobConf = prepareJob(inputPaths, outputPath, maxRecommendations, reducers);
+    RunningJob job = JobClient.runJob(jobConf);
+    return 0;
+  }
+
+}



Mime
View raw message