mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject svn commit: r958517 [1/2] - in /mahout/trunk/core/src: main/java/org/apache/mahout/cf/taste/hadoop/ main/java/org/apache/mahout/cf/taste/hadoop/similarity/ main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ main/java/org/apache/mahout/math/ha...
Date Mon, 28 Jun 2010 09:41:11 GMT
Author: srowen
Date: Mon Jun 28 09:41:09 2010
New Revision: 958517

URL: http://svn.apache.org/viewvc?rev=958517&view=rev
Log:
MAHOUT-418

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarItem.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/Cooccurrence.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/SimilarityMatrixEntryKey.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrence.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrenceArray.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedRowPair.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/AbstractDistributedVectorSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedEuclideanDistanceVectorSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedLoglikelihoodVectorSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedPearsonCorrelationVectorSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedTanimotoCoefficientVectorSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedUncenteredCosineVectorSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedUncenteredZeroAssumingCosineVectorSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedVectorSimilarity.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/vector/
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedEuclideanDistanceVectorSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedLoglikelihoodVectorSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedPearsonCorrelationVectorSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedTanimotoCoefficientVectorSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedUncenteredCosineVectorSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedUncenteredZeroAssumingCosineVectorSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedVectorSimilarityTestCase.java
Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/CoRating.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CapSimilaritiesPerItemKeyWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CapSimilaritiesPerItemMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CapSimilaritiesPerItemReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightArrayWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/RemoveDuplicatesReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarItemWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarityTestCase.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarityTest.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop;
+
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * some helper methods for the hadoop-related stuff in org.apache.mahout.cf.taste
+ */
+public final class TasteHadoopUtils {
+
+  /** standard delimiter of textual preference data */
+  private static final Pattern PREFERENCE_TOKEN_DELIMITER = Pattern.compile("[\t,]");
+
+  /**
+   * splits a preference data line into string tokens
+   *
+   * @param line
+   * @return
+   */
+  public static String[] splitPrefTokens(String line) {
+    return PREFERENCE_TOKEN_DELIMITER.split(line);
+  }
+
+  /** a path filter used to read files written by hadoop */
+  public static final PathFilter PARTS_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith("part-");
+    }
+  };
+
+  /**
+   * maps a long to an int
+   *
+   * @param id
+   * @return
+   */
+  public static int idToIndex(long id) {
+    return 0x7FFFFFFF & ((int) id ^ (int) (id >>> 32));
+  }
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java?rev=958517&r1=958516&r2=958517&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java Mon Jun 28 09:41:09 2010
@@ -18,11 +18,11 @@
 package org.apache.mahout.cf.taste.hadoop.similarity.item;
 
 import java.io.IOException;
-import java.util.regex.Pattern;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.math.VarLongWritable;
 
 /**
@@ -31,14 +31,12 @@ import org.apache.mahout.math.VarLongWri
 public class CountUsersMapper extends
     Mapper<LongWritable,Text,CountUsersKeyWritable, VarLongWritable> {
 
-  private static final Pattern DELIMITER = Pattern.compile("[\t,]");
-
   @Override
   protected void map(LongWritable key,
                      Text value,
                      Context context) throws IOException, InterruptedException {
 
-    String[] tokens = DELIMITER.split(value.toString());
+    String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
     long userID = Long.parseLong(tokens[0]);
 
     context.write(new CountUsersKeyWritable(userID), new VarLongWritable(userID));

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=958517&r1=958516&r2=958517&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java Mon Jun 28 09:41:09 2010
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -27,132 +27,77 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
-import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
-import org.apache.mahout.cf.taste.hadoop.EntityPrefWritableArrayWritable;
-import org.apache.mahout.cf.taste.hadoop.ToUserPrefsMapper;
-import org.apache.mahout.cf.taste.hadoop.similarity.CoRating;
-import org.apache.mahout.cf.taste.hadoop.similarity.DistributedItemSimilarity;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexMapper;
+import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexReducer;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob;
 
-/**
- * <p>Runs a completely distributed computation of the similarity of the itemvectors of the user-item-matrix
- *  as a series of mapreduces.</p>
- *
- * <p>Algorithm used is a slight modification from the algorithm described in
- * http://www.umiacs.umd.edu/~jimmylin/publications/Elsayed_etal_ACL2008_short.pdf</p>
- *
- * <pre>
- * Example using cosine distance:
- *
- * user-item-matrix:
- *
- *                  Game   Mouse    PC
- *          Peter     0       1      2
- *          Paul      1       0      1
- *
- * Input:
- *
- *  (Peter,Mouse,1)
- *  (Peter,PC,2)
- *  (Paul,Game,1)
- *  (Paul,PC,1)
- *
- * Step 1: Create the item-vectors
- *
- *  Game  -> (Paul,1)
- *  Mouse -> (Peter,1)
- *  PC    -> (Peter,2),(Paul,1)
- *
- * Step 2: Compute the length of the item vectors, store it with the item, create the user-vectors
- *
- *  Peter -> (Mouse,1,1),(PC,2.236,2)
- *  Paul  -> (Game,1,1),(PC,2.236,2)
- *
- * Step 3: Compute the pairwise cosine for all item pairs that have been co-rated by at least one user
- *
- *  Mouse,PC  -> 1 * 2 / (1 * 2.236)
- *  Game,PC   -> 1 * 1 / (1 * 2.236)
- *
- * </pre>
- *
- * <p>Command line arguments specific to this class are:</p>
- *
- * <ol>
- * <li>-Dmapred.input.dir=(path): Directory containing a text file containing the entries of the user-item-matrix in
- * the form userID,itemID,preference
- * computed, one per line</li>
- * <li>-Dmapred.output.dir=(path): output path where the computations output should go</li>
- * <li>--similarityClassname (classname): an implemenation of {@link DistributedItemSimilarity} used to compute the
- * similarity</li>
- * <li>--maxSimilaritiesPerItem (integer): try to cap the number of similar items per item to this number
- * (default: 100)</li>
- * </ol>
- *
- *
- * <p>General command line options are documented in {@link AbstractJob}.</p>
- * <p>Please consider supplying a --tempDir parameter for this job, as is needs to write some intermediate files</p>
- *
- * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other
- * arguments.</p>
- */
 public final class ItemSimilarityJob extends AbstractJob {
 
-  public static final String DISTRIBUTED_SIMILARITY_CLASSNAME =
-      "org.apache.mahout.cf.taste.hadoop.similarity.item.ItemSimilarityJob.distributedSimilarityClassname";
+  static final String ITEM_ID_INDEX_PATH_STR = ItemSimilarityJob.class.getName() + "itemIDIndexPathStr";
+  static final String MAX_SIMILARITIES_PER_ITEM = ItemSimilarityJob.class.getName() + "maxSimilarItemsPerItem";
 
-  public static final String NUMBER_OF_USERS =
-      "org.apache.mahout.cf.taste.hadoop.similarity.item.ItemSimilarityJob.numberOfUsers";
-
-  public static final String MAX_SIMILARITIES_PER_ITEM =
-      "org.apache.mahout.cf.taste.hadoop.similarity.item.ItemSimilarityJob.maxSimilaritiesPerItem";
-
-  private static final Integer DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM = 100;
+  private static final int DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM = 100;
 
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new ItemSimilarityJob(), args);
+  }
+  
   @Override
-  public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
+  public int run(String[] args) throws Exception {
 
     addInputOption();
     addOutputOption();
     addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate");
     addOption("maxSimilaritiesPerItem", "m", "try to cap the number of similar items per item to this number " +
-    		"(default: " + DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM + ")", String.valueOf(DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM));
+        "(default: " + DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM + ")", String.valueOf(DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM));
 
     Map<String,String> parsedArgs = parseArguments(args);
     if (parsedArgs == null) {
       return -1;
     }
 
-    String distributedSimilarityClassname = parsedArgs.get("--similarityClassname");
-    int maxSimilaritiesPerItem = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
+    String similarityClassName = parsedArgs.get("--similarityClassname");
+    int maxSimilarItemsPerItem = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
 
     Path inputPath = getInputPath();
     Path outputPath = getOutputPath();
     Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
 
+    Path itemIDIndexPath = new Path(tempDirPath, "itemIDIndex");
     Path countUsersPath = new Path(tempDirPath, "countUsers");
-    Path itemVectorsPath = new Path(tempDirPath, "itemVectors");
-    Path userVectorsPath = new Path(tempDirPath, "userVectors");
-    Path similaritiesPath = new Path(tempDirPath, "similarities");
-    Path cappedSimilaritiesPath = new Path(tempDirPath, "cappedSimilarities");
+    Path itemUserMatrixPath = new Path(tempDirPath, "itemUserMatrix");
+    Path similarityMatrixPath = new Path(tempDirPath, "similarityMatrix");
 
     AtomicInteger currentPhase = new AtomicInteger();
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      /* count all unique users */
+      Job itemIDIndex = prepareJob(
+        inputPath, itemIDIndexPath, TextInputFormat.class,
+        ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
+        ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
+        SequenceFileOutputFormat.class);
+      itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
+      itemIDIndex.waitForCompletion(true);
+    }
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
       Job countUsers = prepareJob(inputPath,
                                   countUsersPath,
                                   TextInputFormat.class,
@@ -163,107 +108,58 @@ public final class ItemSimilarityJob ext
                                   VarIntWritable.class,
                                   NullWritable.class,
                                   TextOutputFormat.class);
-      countUsers.setPartitionerClass(CountUsersKeyWritable.CountUsersPartitioner.class);
-      countUsers.setGroupingComparatorClass(CountUsersKeyWritable.CountUsersGroupComparator.class);
-      countUsers.waitForCompletion(true);
+        countUsers.setPartitionerClass(CountUsersKeyWritable.CountUsersPartitioner.class);
+        countUsers.setGroupingComparatorClass(CountUsersKeyWritable.CountUsersGroupComparator.class);
+        countUsers.waitForCompletion(true);
     }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job itemVectors = prepareJob(inputPath,
-                                   itemVectorsPath,
-                                   TextInputFormat.class,
-                                   ToUserPrefsMapper.class,
-                                   VarLongWritable.class,
-                                   EntityPrefWritable.class,
-                                   ToItemVectorReducer.class,
-                                   VarLongWritable.class,
-                                   EntityPrefWritableArrayWritable.class,
-                                   SequenceFileOutputFormat.class);
-      itemVectors.waitForCompletion(true);
+      Job itemUserMatrix = prepareJob(inputPath,
+                                  itemUserMatrixPath,
+                                  TextInputFormat.class,
+                                  PrefsToItemUserMatrixMapper.class,
+                                  VarIntWritable.class,
+                                  DistributedRowMatrix.MatrixEntryWritable.class,
+                                  PrefsToItemUserMatrixReducer.class,
+                                  IntWritable.class,
+                                  VectorWritable.class,
+                                  SequenceFileOutputFormat.class);
+      itemUserMatrix.waitForCompletion(true);
     }
 
-    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job userVectors = prepareJob(itemVectorsPath,
-                                   userVectorsPath,
-                                   SequenceFileInputFormat.class,
-                                   PreferredItemsPerUserMapper.class,
-                                   VarLongWritable.class,
-                                   ItemPrefWithItemVectorWeightWritable.class,
-                                   PreferredItemsPerUserReducer.class,
-                                   VarLongWritable.class,
-                                   ItemPrefWithItemVectorWeightArrayWritable.class,
-                                   SequenceFileOutputFormat.class);
-      userVectors.getConfiguration().set(DISTRIBUTED_SIMILARITY_CLASSNAME, distributedSimilarityClassname);
-      userVectors.waitForCompletion(true);
-    }
+    int numberOfUsers = readNumberOfUsers(getConf(), countUsersPath);
+
+    /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
+     * new DistributedRowMatrix(...).rowSimilarity(...) */
+    RowSimilarityJob.main(new String[] { "-Dmapred.input.dir=" + itemUserMatrixPath.toString(),
+        "-Dmapred.output.dir=" + similarityMatrixPath.toString(), "--numberOfColumns", String.valueOf(numberOfUsers),
+        "--similarityClassname", similarityClassName, "--maxSimilaritiesPerRow",
+        String.valueOf(maxSimilarItemsPerItem + 1), "--tempDir", tempDirPath.toString() });
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job similarity = prepareJob(userVectorsPath,
-                                  similaritiesPath,
+      Job mostSimilarItems = prepareJob(similarityMatrixPath,
+                                  outputPath,
                                   SequenceFileInputFormat.class,
-                                  CopreferredItemsMapper.class,
-                                  ItemPairWritable.class,
-                                  CoRating.class,
-                                  SimilarityReducer.class,
+                                  MostSimilarItemPairsMapper.class,
                                   EntityEntityWritable.class,
                                   DoubleWritable.class,
-                                  SequenceFileOutputFormat.class);
-      Configuration conf = similarity.getConfiguration();
-      int numberOfUsers = readNumberOfUsers(conf, countUsersPath);
-      conf.set(DISTRIBUTED_SIMILARITY_CLASSNAME, distributedSimilarityClassname);
-      conf.setInt(NUMBER_OF_USERS, numberOfUsers);
-      similarity.waitForCompletion(true);
-    }
-
-    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job capSimilaritiesPerItem = prepareJob(similaritiesPath,
-                                              cappedSimilaritiesPath,
-                                              SequenceFileInputFormat.class,
-                                              CapSimilaritiesPerItemMapper.class,
-                                              CapSimilaritiesPerItemKeyWritable.class,
-                                              SimilarItemWritable.class,
-                                              CapSimilaritiesPerItemReducer.class,
-                                              EntityEntityWritable.class,
-                                              DoubleWritable.class,
-                                              SequenceFileOutputFormat.class);
-
-      capSimilaritiesPerItem.getConfiguration().setInt(MAX_SIMILARITIES_PER_ITEM, maxSimilaritiesPerItem);
-      capSimilaritiesPerItem.setPartitionerClass(
-          CapSimilaritiesPerItemKeyWritable.CapSimilaritiesPerItemKeyPartitioner.class);
-      capSimilaritiesPerItem.setGroupingComparatorClass(
-          CapSimilaritiesPerItemKeyWritable.CapSimilaritiesPerItemKeyGroupingComparator.class);
-      capSimilaritiesPerItem.waitForCompletion(true);
-    }
-
-    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job removeDuplicates = prepareJob(cappedSimilaritiesPath,
-                                        outputPath,
-                                        SequenceFileInputFormat.class,
-                                        Mapper.class,
-                                        EntityEntityWritable.class,
-                                        DoubleWritable.class,
-                                        RemoveDuplicatesReducer.class,
-                                        EntityEntityWritable.class,
-                                        DoubleWritable.class,
-                                        TextOutputFormat.class);
-      removeDuplicates.waitForCompletion(true);
+                                  MostSimilarItemPairsReducer.class,
+                                  EntityEntityWritable.class,
+                                  DoubleWritable.class,
+                                  TextOutputFormat.class);
+      Configuration mostSimilarItemsConf = mostSimilarItems.getConfiguration();
+      mostSimilarItemsConf.set(ITEM_ID_INDEX_PATH_STR, itemIDIndexPath.toString());
+      mostSimilarItemsConf.setInt(MAX_SIMILARITIES_PER_ITEM, maxSimilarItemsPerItem);
+      mostSimilarItems.setCombinerClass(MostSimilarItemPairsReducer.class);
+      mostSimilarItems.waitForCompletion(true);
     }
 
     return 0;
   }
 
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new ItemSimilarityJob(), args);
-  }
-
   static int readNumberOfUsers(Configuration conf, Path outputDir) throws IOException {
     FileSystem fs = FileSystem.get(conf);
-    Path outputFile = fs.listStatus(outputDir, new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        return path.getName().startsWith("part-");
-      }
-    })[0].getPath();
+    Path outputFile = fs.listStatus(outputDir, TasteHadoopUtils.PARTS_FILTER)[0].getPath();
     InputStream in = null;
     try  {
       in = fs.open(outputFile);
@@ -274,17 +170,4 @@ public final class ItemSimilarityJob ext
       IOUtils.closeStream(in);
     }
   }
-
-  static DistributedItemSimilarity instantiateSimilarity(String classname) {
-    try {
-      return (DistributedItemSimilarity) Class.forName(classname).newInstance();
-    } catch (ClassNotFoundException cnfe) {
-      throw new IllegalStateException(cnfe);
-    } catch (InstantiationException ie) {
-      throw new IllegalStateException(ie);
-    } catch (IllegalAccessException iae) {
-      throw new IllegalStateException(iae);
-    }
-  }
-
 }

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+
+public class MostSimilarItemPairsMapper
+    extends Mapper<IntWritable,VectorWritable,EntityEntityWritable,DoubleWritable> {
+
+  private OpenIntLongHashMap indexItemIDMap;
+  private int maxSimilarItemsPerItem;
+
+  @Override
+  protected void setup(Context ctx) {
+    Configuration conf = ctx.getConfiguration();
+    String itemIDIndexPathStr = conf.get(ItemSimilarityJob.ITEM_ID_INDEX_PATH_STR);
+    maxSimilarItemsPerItem = conf.getInt(ItemSimilarityJob.MAX_SIMILARITIES_PER_ITEM, -1);
+    if (maxSimilarItemsPerItem < 1) {
+      throw new IllegalStateException("maxSimilarItemsPerItem was not correctly set!");
+    }
+
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      Path itemIDIndexPath = new Path(itemIDIndexPathStr).makeQualified(fs);
+      indexItemIDMap = new OpenIntLongHashMap();
+      VarIntWritable index = new VarIntWritable();
+      VarLongWritable id = new VarLongWritable();
+      for (FileStatus status : fs.listStatus(itemIDIndexPath, TasteHadoopUtils.PARTS_FILTER)) {
+        String path = status.getPath().toString();
+        SequenceFile.Reader reader =
+            new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), conf);
+        while (reader.next(index, id)) {
+          indexItemIDMap.put(index.get(), id.get());
+        }
+        reader.close();
+      }
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+  @Override
+  protected void map(IntWritable itemIDIndexWritable, VectorWritable similarityVector, Context ctx)
+      throws IOException, InterruptedException {
+
+    int itemIDIndex = itemIDIndexWritable.get();
+
+    Queue<SimilarItem> topMostSimilarItems = new PriorityQueue<SimilarItem>(maxSimilarItemsPerItem + 1,
+        Collections.reverseOrder(SimilarItem.COMPARE_BY_SIMILARITY));
+
+    Iterator<Element> similarityVectorIterator = similarityVector.get().iterateNonZero();
+
+    while (similarityVectorIterator.hasNext()) {
+      Vector.Element element = similarityVectorIterator.next();
+      int index = element.index();
+      double value = element.get();
+      /* ignore self similarities */
+      if (index != itemIDIndex) {
+        if (topMostSimilarItems.size() < maxSimilarItemsPerItem) {
+          topMostSimilarItems.add(new SimilarItem(indexItemIDMap.get(index), value));
+        } else if (value > topMostSimilarItems.peek().getSimilarity()) {
+          topMostSimilarItems.add(new SimilarItem(indexItemIDMap.get(index), value));
+          topMostSimilarItems.poll();
+        }
+      }
+    }
+
+    if (!topMostSimilarItems.isEmpty()) {
+      List<SimilarItem> mostSimilarItems = new ArrayList<SimilarItem>(topMostSimilarItems.size());
+      mostSimilarItems.addAll(topMostSimilarItems);
+      Collections.sort(mostSimilarItems, SimilarItem.COMPARE_BY_SIMILARITY);
+
+      long itemID = indexItemIDMap.get(itemIDIndex);
+      for (SimilarItem similarItem : mostSimilarItems) {
+       long otherItemID = similarItem.getItemID();
+       if (itemID < otherItemID) {
+         ctx.write(new EntityEntityWritable(itemID, otherItemID), new DoubleWritable(similarItem.getSimilarity()));
+       } else {
+         ctx.write(new EntityEntityWritable(otherItemID, itemID), new DoubleWritable(similarItem.getSimilarity()));
+       }
+      }
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsReducer.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsReducer.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
+
+public class MostSimilarItemPairsReducer
+    extends Reducer<EntityEntityWritable,DoubleWritable,EntityEntityWritable,DoubleWritable> {
+
+  @Override
+  protected void reduce(EntityEntityWritable itemIDPair, Iterable<DoubleWritable> values, Context ctx)
+      throws IOException, InterruptedException {
+    ctx.write(itemIDPair, values.iterator().next());
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixMapper.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixMapper.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+/**
+ * creates an item-user-matrix entry from a preference, replacing userID and itemID with int indices
+ */
+public class PrefsToItemUserMatrixMapper
+    extends Mapper<LongWritable,Text,VarIntWritable,DistributedRowMatrix.MatrixEntryWritable> {
+
+  @Override
+  protected void map(LongWritable key, Text value, Context ctx)
+      throws IOException, InterruptedException {
+
+    String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
+    long userID = Long.parseLong(tokens[0]);
+    long itemID = Long.parseLong(tokens[1]);
+    float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) : 1.0f;
+
+    int row = TasteHadoopUtils.idToIndex(itemID);
+    int column = TasteHadoopUtils.idToIndex(userID);
+
+    DistributedRowMatrix.MatrixEntryWritable entry = new DistributedRowMatrix.MatrixEntryWritable();
+    entry.setRow(row);
+    entry.setCol(column);
+    entry.setVal(prefValue);
+
+    ctx.write(new VarIntWritable(row), entry);
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
+
+/**
+ * creates matrix rows ({@link VectorWritable}s) from the {@link MatrixEntryWritable}s
+ */
+public class PrefsToItemUserMatrixReducer
+    extends Reducer<VarIntWritable,DistributedRowMatrix.MatrixEntryWritable,IntWritable,VectorWritable> {
+
+  @Override
+  protected void reduce(VarIntWritable rowIndex, Iterable<DistributedRowMatrix.MatrixEntryWritable> entries,
+      Context ctx) throws IOException, InterruptedException {
+    Vector row = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    for (MatrixEntryWritable entry : entries) {
+      row.setQuick(entry.getCol(), entry.getVal());
+    }
+    ctx.write(new IntWritable(rowIndex.get()), new VectorWritable(row));
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarItem.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarItem.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarItem.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarItem.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import java.util.Comparator;
+
+class SimilarItem {
+
+  static final Comparator<SimilarItem> COMPARE_BY_SIMILARITY = new BySimilaritySimilarItemComparator();
+
+  private long itemID;
+  private double similarity;
+
+  public SimilarItem(long itemID, double similarity) {
+    super();
+    this.itemID = itemID;
+    this.similarity = similarity;
+  }
+
+  public long getItemID() {
+    return itemID;
+  }
+
+  public double getSimilarity() {
+    return similarity;
+  }
+
+  static class BySimilaritySimilarItemComparator implements Comparator<SimilarItem> {
+    @Override
+    public int compare(SimilarItem s1, SimilarItem s2) {
+      return (s1.similarity == s2.similarity) ? 0 : (s1.similarity < s2.similarity) ? -1 : 1;
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/Cooccurrence.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/Cooccurrence.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/Cooccurrence.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/Cooccurrence.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Varint;
+
+/**
+ * a pair of entries in the same column of a row vector where each of the entries' values is != NaN
+ */
+public class Cooccurrence implements Writable {
+
+  private int column;
+  private double valueA;
+  private double valueB;
+
+  public Cooccurrence() {
+    super();
+  }
+
+  public Cooccurrence(int column, double valueA, double valueB) {
+    super();
+    this.column = column;
+    this.valueA = valueA;
+    this.valueB = valueB;
+  }
+
+  public void set(int column, double valueA, double valueB) {
+    this.column = column;
+    this.valueA = valueA;
+    this.valueB = valueB;
+  }
+
+  public int getColumn() {
+    return column;
+  }
+  public double getValueA() {
+    return valueA;
+  }
+  public double getValueB() {
+    return valueB;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    column = Varint.readSignedVarInt(in);
+    valueA = in.readDouble();
+    valueB = in.readDouble();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Varint.writeSignedVarInt(column, out);
+    out.writeDouble(valueA);
+    out.writeDouble(valueB);
+  }
+
+  @Override
+  public int hashCode() {
+    return column;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof Cooccurrence) {
+      return column == ((Cooccurrence)other).column;
+    }
+    return false;
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
+import org.apache.mahout.math.hadoop.similarity.vector.DistributedVectorSimilarity;
+
+/**
+ * <p>Runs a completely distributed computation of the pairwise similarity of the row vectors of a
+ * {@link DistributedRowMatrix} as a series of mapreduces.</p>
+ *
+ * <p>The algorithm used is a slight modification from the algorithm described in
+ * http://www.umiacs.umd.edu/~jimmylin/publications/Elsayed_etal_ACL2008_short.pdf</p>
+ *
+ *
+ * <p>Command line arguments specific to this class are:</p>
+ *
+ * <ol>
+ * <li>-Dmapred.input.dir=(path): Directory containing a {@link DistributedRowMatrix} as a
+ * SequenceFile<IntWritable,VectorWritable></li>
+ * <li>-Dmapred.output.dir=(path): output path where the computations output should go (a {@link DistributedRowMatrix}
+ * stored as a SequenceFile<IntWritable,VectorWritable>)</li>
+ * <li>--numberOfColumns: the number of columns in the input matrix</li>
+ * <li>--similarityClassname (classname): an implementation of {@link DistributedVectorSimilarity} used to compute the
+ * similarity</li>
+ * <li>--maxSimilaritiesPerRow (integer): cap the number of similar rows per row to this number (default: 100)</li>
+ * </ol>
+ *
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ * <p>Please consider supplying a --tempDir parameter for this job, as is needs to write some intermediate files</p>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other
+ * arguments.</p>
+ */
+public class RowSimilarityJob extends AbstractJob {
+
+  public static final String DISTRIBUTED_SIMILARITY_CLASSNAME =
+      RowSimilarityJob.class.getName() + ".distributedSimilarityClassname";
+  public static final String NUMBER_OF_COLUMNS = RowSimilarityJob.class.getName() + "numberOfRows";
+  public static final String MAX_SIMILARITIES_PER_ROW = RowSimilarityJob.class.getName() + "maxSimilaritiesPerRow";
+
+  private static final int DEFAULT_MAX_SIMILARITIES_PER_ROW = 100;
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new RowSimilarityJob(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+    addOption("numberOfColumns", "r", "Number of columns in the input matrix");
+    addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate");
+    addOption("maxSimilaritiesPerRow", "m", "Number of maximum similarities per row (default: " +
+        DEFAULT_MAX_SIMILARITIES_PER_ROW + ")", String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ROW));
+
+    Map<String,String> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    int numberOfColumns = Integer.parseInt(parsedArgs.get("--numberOfColumns"));
+    String distributedSimilarityClassname = parsedArgs.get("--similarityClassname");
+    int maxSimilaritiesPerRow = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerRow"));
+
+    Path inputPath = getInputPath();
+    Path outputPath = getOutputPath();
+    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
+
+    Path weightsPath = new Path(tempDirPath, "weights");
+    Path pairwiseSimilarityPath = new Path(tempDirPath, "pairwiseSimilarity");
+
+    AtomicInteger currentPhase = new AtomicInteger();
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job weights = prepareJob(inputPath,
+                               weightsPath,
+                               SequenceFileInputFormat.class,
+                               RowWeightMapper.class,
+                               VarIntWritable.class,
+                               WeightedOccurrence.class,
+                               WeightedOccurrencesPerColumnReducer.class,
+                               VarIntWritable.class,
+                               WeightedOccurrenceArray.class,
+                               SequenceFileOutputFormat.class);
+
+      weights.getConfiguration().set(DISTRIBUTED_SIMILARITY_CLASSNAME, distributedSimilarityClassname);
+      weights.waitForCompletion(true);
+    }
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job pairwiseSimilarity = prepareJob(weightsPath,
+                               pairwiseSimilarityPath,
+                               SequenceFileInputFormat.class,
+                               CooccurrencesMapper.class,
+                               WeightedRowPair.class,
+                               Cooccurrence.class,
+                               SimilarityReducer.class,
+                               SimilarityMatrixEntryKey.class,
+                               DistributedRowMatrix.MatrixEntryWritable.class,
+                               SequenceFileOutputFormat.class);
+
+      Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
+      pairwiseConf.set(DISTRIBUTED_SIMILARITY_CLASSNAME, distributedSimilarityClassname);
+      pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
+      pairwiseSimilarity.waitForCompletion(true);
+    }
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job asMatrix = prepareJob(pairwiseSimilarityPath,
+                               outputPath,
+                               SequenceFileInputFormat.class,
+                               Mapper.class,
+                               SimilarityMatrixEntryKey.class,
+                               DistributedRowMatrix.MatrixEntryWritable.class,
+                               EntriesToVectorsReducer.class,
+                               IntWritable.class,
+                               VectorWritable.class,
+                               SequenceFileOutputFormat.class);
+      asMatrix.setPartitionerClass(SimilarityMatrixEntryKey.SimilarityMatrixEntryKeyPartitioner.class);
+      asMatrix.setGroupingComparatorClass(SimilarityMatrixEntryKey.SimilarityMatrixEntryKeyGroupingComparator.class);
+      asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
+      asMatrix.waitForCompletion(true);
+    }
+
+    return 0;
+  }
+
+  static DistributedVectorSimilarity instantiateSimilarity(String classname) {
+    try {
+      return (DistributedVectorSimilarity) Class.forName(classname).newInstance();
+    } catch (ClassNotFoundException cnfe) {
+      throw new IllegalStateException(cnfe);
+    } catch (InstantiationException ie) {
+      throw new IllegalStateException(ie);
+    } catch (IllegalAccessException iae) {
+      throw new IllegalStateException(iae);
+    }
+  }
+
+  /**
+   * applies {@link DistributedVectorSimilarity#weight(Vector)} to each row of the input matrix
+   */
+  public static class RowWeightMapper extends Mapper<IntWritable,VectorWritable,VarIntWritable,WeightedOccurrence> {
+
+    private DistributedVectorSimilarity similarity;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      super.setup(ctx);
+      similarity = RowSimilarityJob.instantiateSimilarity(ctx.getConfiguration().get(DISTRIBUTED_SIMILARITY_CLASSNAME));
+    }
+
+    @Override
+    protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)
+        throws IOException, InterruptedException {
+
+      Vector v = vectorWritable.get();
+      double weight = similarity.weight(v);
+
+      Iterator<Element> elementsIterator = v.iterateNonZero();
+      while (elementsIterator.hasNext()) {
+        Element element = elementsIterator.next();
+        int column = element.index();
+        double value = element.get();
+        ctx.write(new VarIntWritable(column), new WeightedOccurrence(row.get(), value, weight));
+      }
+    }
+  }
+
+  /**
+   * collects all {@link WeightedOccurrence}s for a column and writes them to a {@link WeightedOccurrenceArray}
+   */
+  public static class WeightedOccurrencesPerColumnReducer
+      extends Reducer<VarIntWritable,WeightedOccurrence,VarIntWritable,WeightedOccurrenceArray> {
+
+    @Override
+    protected void reduce(VarIntWritable column, Iterable<WeightedOccurrence> weightedOccurrences, Context ctx)
+        throws IOException, InterruptedException {
+
+      Set<WeightedOccurrence> collectedWeightedOccurrences = new HashSet<WeightedOccurrence>();
+      for (WeightedOccurrence weightedOccurrence : weightedOccurrences) {
+        collectedWeightedOccurrences.add(weightedOccurrence.clone());
+      }
+
+      ctx.write(column, new WeightedOccurrenceArray(collectedWeightedOccurrences.toArray(
+          new WeightedOccurrence[collectedWeightedOccurrences.size()])));
+    }
+  }
+
+  /**
+   * maps all pairs of weighted entries of a column vector
+   */
+  public static class CooccurrencesMapper
+      extends Mapper<VarIntWritable,WeightedOccurrenceArray,WeightedRowPair,Cooccurrence> {
+
+    @Override
+    protected void map(VarIntWritable column, WeightedOccurrenceArray weightedOccurrenceArray, Context ctx)
+        throws IOException, InterruptedException {
+
+      WeightedOccurrence[] weightedOccurrences = weightedOccurrenceArray.getWeightedOccurrences();
+
+      WeightedRowPair rowPair = new WeightedRowPair();
+      Cooccurrence coocurrence = new Cooccurrence();
+
+      for (int n = 0; n < weightedOccurrences.length; n++) {
+        int rowA = weightedOccurrences[n].getRow();
+        double weightA = weightedOccurrences[n].getWeight();
+        double valueA = weightedOccurrences[n].getValue();
+        for (int m = n; m < weightedOccurrences.length; m++) {
+          int rowB = weightedOccurrences[m].getRow();
+          double weightB = weightedOccurrences[m].getWeight();
+          double valueB = weightedOccurrences[m].getValue();
+          rowPair.set(rowA, rowB, weightA, weightB);
+          coocurrence.set(column.get(), valueA, valueB);
+          ctx.write(rowPair, coocurrence);
+        }
+      }
+    }
+  }
+
+  /**
+   * computes the pairwise similarities
+   */
+  public static class SimilarityReducer
+      extends Reducer<WeightedRowPair,Cooccurrence,SimilarityMatrixEntryKey,DistributedRowMatrix.MatrixEntryWritable> {
+
+    private DistributedVectorSimilarity similarity;
+    private int numberOfColumns;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      super.setup(ctx);
+      similarity = RowSimilarityJob.instantiateSimilarity(ctx.getConfiguration().get(DISTRIBUTED_SIMILARITY_CLASSNAME));
+      numberOfColumns = ctx.getConfiguration().getInt(NUMBER_OF_COLUMNS, -1);
+      if (numberOfColumns < 1) {
+        throw new IllegalStateException("Number of columns was not correctly set!");
+      }
+    }
+
+    @Override
+    protected void reduce(WeightedRowPair rowPair, Iterable<Cooccurrence> cooccurrences, Context ctx)
+        throws IOException, InterruptedException {
+
+      int rowA = rowPair.getRowA();
+      int rowB = rowPair.getRowB();
+      double similarityValue = similarity.similarity(rowA, rowB, cooccurrences, rowPair.getWeightA(),
+          rowPair.getWeightB(), numberOfColumns);
+
+      if (!Double.isNaN(similarityValue)) {
+        SimilarityMatrixEntryKey key = new SimilarityMatrixEntryKey();
+        DistributedRowMatrix.MatrixEntryWritable entry = new DistributedRowMatrix.MatrixEntryWritable();
+        entry.setVal(similarityValue);
+
+        entry.setRow(rowA);
+        entry.setCol(rowB);
+        key.set(rowA, similarityValue);
+        ctx.write(key, entry);
+
+        if (rowA != rowB) {
+          entry.setRow(rowB);
+          entry.setCol(rowA);
+          key.set(rowB, similarityValue);
+          ctx.write(key, entry);
+        }
+      }
+    }
+  }
+
+  /**
+   * collects all {@link MatrixEntryWritable} for each column and creates a {@link VectorWritable}
+   */
+  public static class EntriesToVectorsReducer
+      extends Reducer<SimilarityMatrixEntryKey,DistributedRowMatrix.MatrixEntryWritable,IntWritable,VectorWritable> {
+
+    private int maxSimilaritiesPerRow;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      super.setup(ctx);
+      maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, -1);
+      if (maxSimilaritiesPerRow < 1) {
+        throw new IllegalStateException("Maximum number of similarities per row was not correctly set!");
+      }
+    }
+
+    @Override
+    protected void reduce(SimilarityMatrixEntryKey key, Iterable<MatrixEntryWritable> entries, Context ctx)
+        throws IOException, InterruptedException {
+      RandomAccessSparseVector temporaryVector = new RandomAccessSparseVector(Integer.MAX_VALUE, maxSimilaritiesPerRow);
+      int similaritiesSet = 0;
+      for (MatrixEntryWritable entry : entries) {
+        temporaryVector.setQuick(entry.getCol(), entry.getVal());
+        if (++similaritiesSet == maxSimilaritiesPerRow) {
+          break;
+        }
+      }
+      SequentialAccessSparseVector vector = new SequentialAccessSparseVector(temporaryVector);
+      ctx.write(new IntWritable(key.getRow()), new VectorWritable(vector));
+    }
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/SimilarityMatrixEntryKey.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/SimilarityMatrixEntryKey.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/SimilarityMatrixEntryKey.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/SimilarityMatrixEntryKey.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.mahout.math.Varint;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
+import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob.EntriesToVectorsReducer;
+
+/**
+ * used as key for the {@link EntriesToVectorsReducer} to collect all rows similar to the specified row
+ *
+ * ensures that the similarity matrix entries for a row are seen in descending order
+ * by their similarity value via secondary sort
+ */
+public class SimilarityMatrixEntryKey implements WritableComparable<SimilarityMatrixEntryKey> {
+
+  private int row;
+  private double value;
+
+  static {
+    WritableComparator.define(SimilarityMatrixEntryKey.class, new SimilarityMatrixEntryKeyComparator());
+  }
+
+  public SimilarityMatrixEntryKey() {
+    super();
+  }
+
+  public SimilarityMatrixEntryKey(int row, double value) {
+    super();
+    this.row = row;
+    this.value = value;
+  }
+
+  public void set(int row, double value) {
+    this.row = row;
+    this.value = value;
+  }
+
+  public int getRow() {
+    return row;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    row = Varint.readSignedVarInt(in);
+    value = in.readDouble();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Varint.writeSignedVarInt(row, out);
+    out.writeDouble(value);
+  }
+
+  @Override
+  public int compareTo(SimilarityMatrixEntryKey other) {
+    return (row == other.row) ? 0 : (row < other.row) ? -1 : 1;
+  }
+
+  @Override
+  public int hashCode() {
+    return row;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof SimilarityMatrixEntryKey) {
+      return row == ((SimilarityMatrixEntryKey)other).row;
+    }
+    return false;
+  }
+
+  public static class SimilarityMatrixEntryKeyComparator extends WritableComparator {
+
+    protected SimilarityMatrixEntryKeyComparator() {
+      super(SimilarityMatrixEntryKey.class, true);
+    }
+
+    @Override
+    public int compare(WritableComparable a, WritableComparable b) {
+      SimilarityMatrixEntryKey key1 = (SimilarityMatrixEntryKey) a;
+      SimilarityMatrixEntryKey key2 = (SimilarityMatrixEntryKey) b;
+
+      int result = compare(key1.row, key2.row);
+      if (result == 0) {
+        result = -1 * compare(key1.value, key2.value);
+      }
+      return result;
+    }
+
+    protected static int compare(long a, long b) {
+      return (a == b) ? 0 : (a < b) ? -1 : 1;
+    }
+
+    protected static int compare(double a, double b) {
+      return (a == b) ? 0 : (a < b) ? -1 : 1;
+    }
+  }
+
+  public static class SimilarityMatrixEntryKeyPartitioner
+      extends Partitioner<SimilarityMatrixEntryKey,MatrixEntryWritable> {
+    @Override
+    public int getPartition(SimilarityMatrixEntryKey key, MatrixEntryWritable value, int numPartitions) {
+      return (key.hashCode() * 127) % numPartitions;
+    }
+  }
+
+  public static class SimilarityMatrixEntryKeyGroupingComparator extends WritableComparator {
+
+    protected SimilarityMatrixEntryKeyGroupingComparator() {
+      super(SimilarityMatrixEntryKey.class, true);
+    }
+
+    @Override
+    public int compare(WritableComparable a, WritableComparable b) {
+      return a.compareTo(b);
+    }
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrence.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrence.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrence.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrence.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Varint;
+
+/**
+ * an entry in a row vector stored together with a precomputed weight of the row
+ */
+class WeightedOccurrence implements Writable, Cloneable {
+
+  private int row;
+  private double value;
+  private double weight;
+
+  public WeightedOccurrence() {
+    super();
+  }
+
+  public WeightedOccurrence(int row, double value, double weight) {
+    super();
+    this.row = row;
+    this.value = value;
+    this.weight = weight;
+  }
+
+  public int getRow() {
+    return row;
+  }
+
+  public double getValue() {
+    return value;
+  }
+
+  public double getWeight() {
+    return weight;
+  }
+
+  @Override
+  public WeightedOccurrence clone() {
+    return new WeightedOccurrence(row, value, weight);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    row = Varint.readSignedVarInt(in);
+    value = in.readDouble();
+    weight = in.readDouble();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Varint.writeSignedVarInt(row, out);
+    out.writeDouble(value);
+    out.writeDouble(weight);
+  }
+
+  @Override
+  public int hashCode() {
+    return row;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof WeightedOccurrence) {
+      return row == ((WeightedOccurrence)other).row;
+    }
+    return false;
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrenceArray.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrenceArray.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrenceArray.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrenceArray.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity;
+
+import org.apache.hadoop.io.ArrayWritable;
+
+/**
+ * an array of {@link WeightedOccurrence}s
+ */
+class WeightedOccurrenceArray extends ArrayWritable {
+
+  public WeightedOccurrenceArray() {
+    super(WeightedOccurrence.class);
+  }
+
+  public WeightedOccurrenceArray(WeightedOccurrence[] weightedOccurrences) {
+    super(WeightedOccurrence.class);
+    set(weightedOccurrences);
+  }
+
+  public WeightedOccurrence[] getWeightedOccurrences() {
+    return (WeightedOccurrence[]) toArray();
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedRowPair.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedRowPair.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedRowPair.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedRowPair.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.mahout.math.Varint;
+
+/**
+ * a pair of row vectors that has at least one entry != NaN in the same column together with the precomputed weights of
+ * the row vectors
+ */
+public class WeightedRowPair implements WritableComparable<WeightedRowPair> {
+
+  private int rowA;
+  private int rowB;
+  private double weightA;
+  private double weightB;
+
+  public WeightedRowPair() {
+    super();
+  }
+
+  public WeightedRowPair(int rowA, int rowB, double weightA, double weightB) {
+    super();
+    this.rowA = rowA;
+    this.rowB = rowB;
+    this.weightA = weightA;
+    this.weightB = weightB;
+  }
+
+  public void set(int rowA, int rowB, double weightA, double weightB) {
+    this.rowA = rowA;
+    this.rowB = rowB;
+    this.weightA = weightA;
+    this.weightB = weightB;
+  }
+
+  public int getRowA() {
+    return rowA;
+  }
+  public int getRowB() {
+    return rowB;
+  }
+  public double getWeightA() {
+    return weightA;
+  }
+  public double getWeightB() {
+    return weightB;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    rowA = Varint.readSignedVarInt(in);
+    rowB = Varint.readSignedVarInt(in);
+    weightA = in.readDouble();
+    weightB = in.readDouble();
+  }
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Varint.writeSignedVarInt(rowA, out);
+    Varint.writeSignedVarInt(rowB, out);
+    out.writeDouble(weightA);
+    out.writeDouble(weightB);
+  }
+  @Override
+  public int compareTo(WeightedRowPair other) {
+    int result = compare(rowA, other.rowA);
+    if (result == 0) {
+      result = compare(rowB, other.rowB);
+    }
+    return result;
+  }
+
+  @Override
+  public int hashCode() {
+    return rowA + 31 * rowB;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof WeightedRowPair) {
+      WeightedRowPair otherPair = (WeightedRowPair) other;
+      return (rowA == otherPair.rowA && rowB == otherPair.rowB);
+    }
+    return false;
+  }
+
+  protected static int compare(int a, int b) {
+    return (a == b) ? 0 : (a < b) ? -1 : 1;
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/AbstractDistributedVectorSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/AbstractDistributedVectorSimilarity.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/AbstractDistributedVectorSimilarity.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/AbstractDistributedVectorSimilarity.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.vector;
+
+import java.util.Iterator;
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.similarity.Cooccurrence;
+
+/**
+ * abstract base implementation of {@link DistributedVectorSimilarity}
+ */
+public abstract class AbstractDistributedVectorSimilarity implements DistributedVectorSimilarity {
+
+  /**
+   * ensures that the computed similarity is in [-1,1]
+   */
+  @Override
+  public final double similarity(int rowA, int rowB, Iterable<Cooccurrence> cooccurrences, double weightOfVectorA,
+      double weightOfVectorB, int numberOfColumns) {
+
+    double result = doComputeResult(rowA, rowB, cooccurrences, weightOfVectorA, weightOfVectorB, numberOfColumns);
+
+    if (result < -1.0) {
+      result = -1.0;
+    } else if (result > 1.0) {
+      result = 1.0;
+    }
+    return result;
+  }
+
+  /**
+   * computes the number of elements in the {@link Iterable}
+   *
+   * @param iterable
+   * @return
+   */
+  protected int countElements(Iterable<?> iterable) {
+    return countElements(iterable.iterator());
+  }
+
+  /**
+   * computes the number of elements in the {@link Iterator}
+   *
+   * @param iterator
+   * @return
+   */
+  protected int countElements(Iterator<?> iterator) {
+    int count = 0;
+    while (iterator.hasNext()) {
+      count++;
+      iterator.next();
+    }
+    return count;
+  }
+
+  /**
+   * do the actual similarity computation
+   *
+   * @see DistributedVectorSimilarity#similarity(int, int, Iterable, double, double, int)
+   *
+   * @param rowA
+   * @param rowB
+   * @param cooccurrences
+   * @param weightOfVectorA
+   * @param weightOfVectorB
+   * @param numberOfColumns
+   * @return
+   */
+  protected abstract double doComputeResult(int rowA, int rowB, Iterable<Cooccurrence> cooccurrences,
+      double weightOfVectorA, double weightOfVectorB, int numberOfColumns);
+
+  /**
+   * vectors have no weight (NaN) by default, subclasses may override this
+   */
+  @Override
+  public double weight(Vector v) {
+    return Double.NaN;
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedEuclideanDistanceVectorSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedEuclideanDistanceVectorSimilarity.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedEuclideanDistanceVectorSimilarity.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedEuclideanDistanceVectorSimilarity.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.vector;
+
+import org.apache.mahout.math.hadoop.similarity.Cooccurrence;
+
+/**
+ * distributed implementation of euclidean distance as vector similarity measure
+ */
+public class DistributedEuclideanDistanceVectorSimilarity extends AbstractDistributedVectorSimilarity {
+
+  @Override
+  protected double doComputeResult(int rowA, int rowB, Iterable<Cooccurrence> cooccurrences, double weightOfVectorA,
+      double weightOfVectorB, int numberOfColumns) {
+
+    double n = 0.0;
+    double sumXYdiff2 = 0.0;
+
+    for (Cooccurrence cooccurrence : cooccurrences) {
+      double diff = cooccurrence.getValueA() - cooccurrence.getValueB();
+      sumXYdiff2 += diff * diff;
+      n++;
+    }
+
+    return n / (1.0 + Math.sqrt(sumXYdiff2));
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedLoglikelihoodVectorSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedLoglikelihoodVectorSimilarity.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedLoglikelihoodVectorSimilarity.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedLoglikelihoodVectorSimilarity.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.vector;
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.similarity.Cooccurrence;
+
+/**
+ * distributed implementation of loglikelihood as vector similarity measure
+ */
+public class DistributedLoglikelihoodVectorSimilarity extends
+    AbstractDistributedVectorSimilarity {
+
+  @Override
+  protected double doComputeResult(int rowA, int rowB, Iterable<Cooccurrence> cooccurrences, double weightOfVectorA,
+      double weightOfVectorB, int numberOfColumns) {
+
+    int cooccurrenceCount = countElements(cooccurrences);
+    if (cooccurrenceCount == 0) {
+      return Double.NaN;
+    }
+
+    int occurrencesA = (int) weightOfVectorA;
+    int occurrencesB = (int) weightOfVectorB;
+
+    double logLikelihood = twoLogLambda(cooccurrenceCount,
+                                        occurrencesA - cooccurrenceCount,
+                                        occurrencesB,
+                                        numberOfColumns - occurrencesB);
+
+    return 1.0 - 1.0 / (1.0 + logLikelihood);
+  }
+
+  @Override
+  public double weight(Vector v) {
+    return (double) countElements(v.iterateNonZero());
+  }
+
+  private static double twoLogLambda(double k1, double k2, double n1, double n2) {
+    double p = (k1 + k2) / (n1 + n2);
+    return 2.0 * (logL(k1 / n1, k1, n1)
+                  + logL(k2 / n2, k2, n2)
+                  - logL(p, k1, n1)
+                  - logL(p, k2, n2));
+  }
+
+  private static double logL(double p, double k, double n) {
+    return k * safeLog(p) + (n - k) * safeLog(1.0 - p);
+  }
+
+  private static double safeLog(double d) {
+    return d <= 0.0 ? 0.0 : Math.log(d);
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedPearsonCorrelationVectorSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedPearsonCorrelationVectorSimilarity.java?rev=958517&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedPearsonCorrelationVectorSimilarity.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedPearsonCorrelationVectorSimilarity.java Mon Jun 28 09:41:09 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.vector;
+
+import org.apache.mahout.math.hadoop.similarity.Cooccurrence;
+
+/**
+ * distributed implementation of the pearson correlation
+ */
+public class DistributedPearsonCorrelationVectorSimilarity extends AbstractDistributedVectorSimilarity {
+
+  @Override
+  protected double doComputeResult(int rowA, int rowB, Iterable<Cooccurrence> cooccurrences, double weightOfVectorA,
+      double weightOfVectorB, int numberOfColumns) {
+
+    int count = 0;
+    double sumX = 0.0;
+    double sumY = 0.0;
+    double sumXY = 0.0;
+    double sumX2 = 0.0;
+    double sumY2 = 0.0;
+
+    for (Cooccurrence cooccurrence : cooccurrences) {
+      double x = cooccurrence.getValueA();
+      double y = cooccurrence.getValueB();
+
+      sumXY += x * y;
+      sumX += x;
+      sumX2 += x * x;
+      sumY += y;
+      sumY2 += y * y;
+      count++;
+    }
+
+    if (sumXY == 0.0) {
+      return Double.NaN;
+    }
+
+    /* center the data */
+    double n = count;
+    double meanX = sumX / n;
+    double meanY = sumY / n;
+    double centeredSumXY = sumXY - meanY * sumX;
+    double centeredSumX2 = sumX2 - meanX * sumX;
+    double centeredSumY2 = sumY2 - meanY * sumY;
+
+    double denominator = Math.sqrt(centeredSumX2) * Math.sqrt(centeredSumY2);
+    if (denominator == 0.0) {
+      /* One or both vectors has -all- the same values;
+       * can't really say much similarity under this measure */
+      return Double.NaN;
+    }
+
+    return centeredSumXY / denominator;
+  }
+
+}



Mime
View raw message