mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1145213 - in /mahout/trunk: core/src/main/java/org/apache/mahout/graph/common/ core/src/main/java/org/apache/mahout/graph/linkanalysis/ core/src/main/java/org/apache/mahout/graph/model/ core/src/test/java/org/apache/mahout/graph/ core/src/...
Date Mon, 11 Jul 2011 15:48:17 GMT
Author: ssc
Date: Mon Jul 11 15:48:16 2011
New Revision: 1145213

URL: http://svn.apache.org/viewvc?rev=1145213&view=rev
Log:
MAHOUT-742 Pagerank implementation in Map/Reduce

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/GraphUtils.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Edge.java
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/GraphTestCase.java
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java
Modified:
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
    mahout/trunk/math/src/main/java/org/apache/mahout/math/AbstractVector.java

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/GraphUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/GraphUtils.java?rev=1145213&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/GraphUtils.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/GraphUtils.java Mon Jul
11 15:48:16 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.graph.model.Vertex;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/** helper method for working with graphs */
+public class GraphUtils {
+
+  private GraphUtils() {}
+
+  public static int indexVertices(Configuration conf, Path verticesPath, Path indexPath)
throws IOException {
+    FileSystem fs = FileSystem.get(verticesPath.toUri(), conf);
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, indexPath, IntWritable.class,
Vertex.class);
+    int index = 0;
+
+    try {
+      for (FileStatus fileStatus : fs.listStatus(verticesPath)) {
+        InputStream in = fs.open(fileStatus.getPath());
+        try {
+          for (String line : new FileLineIterable(in)) {
+            writer.append(new IntWritable(index++), new Vertex(Long.parseLong(line)));
+          }
+        } finally {
+          Closeables.closeQuietly(in);
+        }
+      }
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+
+    return index;
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java?rev=1145213&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
(added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
Mon Jul 11 15:48:16 2011
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.linkanalysis;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.graph.model.Edge;
+import org.apache.mahout.graph.model.Vertex;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+import com.google.common.io.Closeables;
+import org.apache.mahout.math.map.OpenLongIntHashMap;
+
+/**
+ * <p>Distributed computation of the PageRank a directed graph</p>
+ *
+ * <p>The input files need to be a {@link org.apache.hadoop.io.SequenceFile} with {@link
Edge}s as keys and
+ * any Writable as values and another {@link org.apache.hadoop.io.SequenceFile} with {@link
IntWritable}s as keys and {@link Vertex} as
+ * values, as produced by {@link org.apache.mahout.graph.common.GraphUtils.indexVertices())}</p>
+ *
+ * <p>This job outputs text files with a vertex id and its pagerank per line.</p>
+  *
+ * <p>Command line arguments specific to this class are:</p>
+ *
+ * <ol>
+ * <li>-Dmapred.output.dir=(path): output path where the degree distribution data should
be written</li>
+ * <li>--vertexIndex=(path): Directory containing vertex index as created by GraphUtils.indexVertices()</li>
+ * <li>--edges=(path): Directory containing edges of the graph</li>
+ * <li>--numVertices=(Integer): number of vertices in the graph</li>
+ * <li>--numIterations=(Integer): number of numIterations, default: 5</li>
+ * <li>--teleportationProbability=(Double): probability to teleport to a random vertex,
default: 0.8</li>
+ * </ol>
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear
before all other arguments.</p>
+ */
+public class PageRankJob extends AbstractJob {
+
+  static final String TMP_INDEXED_DEGREES = "indexedDegrees";
+  static final String TMP_TRANSITION_MATRIX = "transitionMatrix";
+  static final String TMP_PAGERANK_VECTOR = "pageRankVector";
+
+  static final String NUM_VERTICES_PARAM = PageRankJob.class.getName() + ".numVertices";
+  static final String VERTEX_INDEX_PARAM = PageRankJob.class.getName() + ".vertexIndex";
+  static final String INDEXED_DEGREES_PARAM = PageRankJob.class.getName() + ".indexedDegrees";
+  static final String TELEPORTATION_PROBABILITY_PARAM = PageRankJob.class.getName() + ".teleportationProbability";
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new PageRankJob(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addOutputOption();
+    addOption("vertexIndex", "vi", "vertex index as created by GraphUtils.indexVertices()",
true);
+    addOption("edges", "e", "edges of the graph", true);
+    addOption("numVertices", "nn", "number of vertices in the graph", true);
+    addOption("numIterations", "it", "number of numIterations", String.valueOf(5));
+    addOption("teleportationProbability", "tp", "probability to teleport to a random vertex",
String.valueOf(0.8));
+
+    Map<String, String> parsedArgs = super.parseArguments(args);
+
+    Path vertexIndex = new Path(parsedArgs.get("--vertexIndex"));
+    Path edges = new Path(parsedArgs.get("--edges"));
+
+    int numVertices = Integer.parseInt(parsedArgs.get("--numVertices"));
+    int numIterations = Integer.parseInt(parsedArgs.get("--numIterations"));
+    double teleportationProbability = Double.parseDouble(parsedArgs.get("--teleportationProbability"));
+
+    Preconditions.checkArgument(numVertices > 0);
+    Preconditions.checkArgument(numIterations > 0);
+    Preconditions.checkArgument(teleportationProbability > 0 && teleportationProbability
< +1);
+
+    Job indexedDegrees = prepareJob(edges, getTempPath(TMP_INDEXED_DEGREES), SequenceFileInputFormat.class,
+        IndexAndCountDegreeMapper.class, IntWritable.class, IntWritable.class, IntSumReducer.class,
IntWritable.class,
+        IntWritable.class, SequenceFileOutputFormat.class);
+    indexedDegrees.getConfiguration().set(NUM_VERTICES_PARAM, String.valueOf(numVertices));
+    indexedDegrees.getConfiguration().set(VERTEX_INDEX_PARAM, vertexIndex.toString());
+    indexedDegrees.setCombinerClass(IntSumReducer.class);
+    indexedDegrees.waitForCompletion(true);
+
+    Job createTransitionMatrix = prepareJob(edges, getTempPath(TMP_TRANSITION_MATRIX),
+        SequenceFileInputFormat.class, RevertEdgesMapper.class, IntWritable.class, IntWritable.class,
+        CreateTransitionMatrixReducer.class, IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class);
+    createTransitionMatrix.getConfiguration().set(NUM_VERTICES_PARAM, String.valueOf(numVertices));
+    createTransitionMatrix.getConfiguration().set(VERTEX_INDEX_PARAM, vertexIndex.toString());
+    createTransitionMatrix.getConfiguration().set(INDEXED_DEGREES_PARAM,
+        getTempPath(TMP_INDEXED_DEGREES).toString());
+    createTransitionMatrix.getConfiguration().set(TELEPORTATION_PROBABILITY_PARAM,
+        String.valueOf(teleportationProbability));
+    createTransitionMatrix.waitForCompletion(true);
+
+    DistributedRowMatrix matrix = new DistributedRowMatrix(getTempPath(TMP_TRANSITION_MATRIX),
getTempPath(),
+        numVertices, numVertices);
+    matrix.setConf(getConf());
+
+    Vector pageRank = new DenseVector(numVertices).assign(1.0 / numVertices);
+    Vector damplingVector = new DenseVector(numVertices).assign((1.0 - teleportationProbability)
/ numVertices);
+
+    while (numIterations-- > 0) {
+      pageRank = matrix.times(pageRank).plus(damplingVector);
+    }
+
+    FileSystem fs = FileSystem.get(getTempPath(TMP_PAGERANK_VECTOR).toUri(), getConf());
+    DataOutputStream stream = fs.create(getTempPath(TMP_PAGERANK_VECTOR), true);
+    try {
+      VectorWritable.writeVector(stream, pageRank);
+    } finally {
+      Closeables.closeQuietly(stream);
+    }
+
+    Job vertexWithPageRank = prepareJob(vertexIndex, getOutputPath(), SequenceFileInputFormat.class,
+        VertexWithPageRankMapper.class, LongWritable.class, DoubleWritable.class, Reducer.class,
LongWritable.class,
+        DoubleWritable.class, TextOutputFormat.class);
+    vertexWithPageRank.getConfiguration().set(VertexWithPageRankMapper.PAGERANK_PATH_PARAM,
+        getTempPath(TMP_PAGERANK_VECTOR).toString());
+    vertexWithPageRank.waitForCompletion(true);
+
+    return 1;
+  }
+
+  public static class IndexAndCountDegreeMapper extends Mapper<Edge,Writable,IntWritable,IntWritable>
{
+
+    private OpenLongIntHashMap vertexIDsToIndex;
+
+    private static final IntWritable ONE = new IntWritable(1);
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      Configuration conf = ctx.getConfiguration();
+      int numVertices = Integer.parseInt(conf.get(NUM_VERTICES_PARAM));
+      Path vertexIndexPath = new Path(conf.get(VERTEX_INDEX_PARAM));
+      vertexIDsToIndex = new OpenLongIntHashMap(numVertices);
+      for (Pair<IntWritable,Vertex> indexAndVertexID :
+          new SequenceFileIterable<IntWritable,Vertex>(vertexIndexPath, true, conf))
{
+        vertexIDsToIndex.put(indexAndVertexID.getSecond().getId(), indexAndVertexID.getFirst().get());
+      }
+    }
+
+    @Override
+    protected void map(Edge edge, Writable value, Context ctx) throws IOException, InterruptedException
{
+      int startIndex = vertexIDsToIndex.get(edge.startVertex().getId());
+      ctx.write(new IntWritable(startIndex), ONE);
+    }
+  }
+
+  public static class RevertEdgesMapper extends Mapper<Edge,Writable,IntWritable,IntWritable>
{
+
+    private OpenLongIntHashMap vertexIDsToIndex;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      Configuration conf = ctx.getConfiguration();
+      int numVertices = Integer.parseInt(conf.get(NUM_VERTICES_PARAM));
+      Path vertexIndexPath = new Path(conf.get(VERTEX_INDEX_PARAM));
+      vertexIDsToIndex = new OpenLongIntHashMap(numVertices);
+      for (Pair<IntWritable,Vertex> indexAndVertexID :
+          new SequenceFileIterable<IntWritable,Vertex>(vertexIndexPath, true, conf))
{
+        vertexIDsToIndex.put(indexAndVertexID.getSecond().getId(), indexAndVertexID.getFirst().get());
+      }
+    }
+
+    @Override
+    protected void map(Edge edge, Writable value, Context ctx) throws IOException, InterruptedException
{
+      int startIndex = vertexIDsToIndex.get(edge.startVertex().getId());
+      int endIndex = vertexIDsToIndex.get(edge.endVertex().getId());
+      ctx.write(new IntWritable(endIndex), new IntWritable(startIndex));
+    }
+  }
+
+  public static class CreateTransitionMatrixReducer
+      extends Reducer<IntWritable, IntWritable, IntWritable, VectorWritable> {
+
+    private int numVertices;
+    private double teleportationProbability;
+    private Vector weights;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      Configuration conf = ctx.getConfiguration();
+      Path indexedDegreesPath = new Path(ctx.getConfiguration().get(INDEXED_DEGREES_PARAM));
+      numVertices = Integer.parseInt(conf.get(NUM_VERTICES_PARAM));
+      teleportationProbability = Double.parseDouble(conf.get(TELEPORTATION_PROBABILITY_PARAM));
+      Preconditions.checkArgument(numVertices > 0);
+      Preconditions.checkArgument(teleportationProbability > 0 && teleportationProbability
< 1);
+      weights = new DenseVector(numVertices);
+
+      for (Pair<IntWritable, IntWritable> indexAndDegree :
+          new SequenceFileDirIterable<IntWritable, IntWritable>(indexedDegreesPath,
PathType.LIST,
+          PathFilters.partFilter(), ctx.getConfiguration())) {
+        weights.set(indexAndDegree.getFirst().get(), 1.0 / indexAndDegree.getSecond().get());
+      }
+    }
+
+    @Override
+    protected void reduce(IntWritable vertexIndex, Iterable<IntWritable> incidentVertexIndexes,
Context ctx)
+        throws IOException, InterruptedException {
+      Vector vector = new RandomAccessSparseVector(numVertices);
+      for (IntWritable incidentVertexIndex : incidentVertexIndexes) {
+        double weight = weights.get(incidentVertexIndex.get()) * teleportationProbability;
+        System.out.println(vertexIndex.get() + "," + incidentVertexIndex.get() + ": " + weight);
+        vector.set(incidentVertexIndex.get(), weight);
+      }
+      ctx.write(vertexIndex, new VectorWritable(vector));
+    }
+  }
+
+  public static class VertexWithPageRankMapper extends Mapper<IntWritable,Vertex,LongWritable,DoubleWritable>
{
+
+    static final String PAGERANK_PATH_PARAM = VertexWithPageRankMapper.class.getName() +
".pageRankPath";
+
+    private Vector pageRanks;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      Path pageRankPath = new Path(ctx.getConfiguration().get(PAGERANK_PATH_PARAM));
+      DataInputStream in = FileSystem.get(pageRankPath.toUri(), ctx.getConfiguration()).open(pageRankPath);
+      try {
+        pageRanks = VectorWritable.readVector(in);
+      } finally {
+        Closeables.closeQuietly(in);
+      }
+    }
+
+    @Override
+    protected void map(IntWritable index, Vertex vertex, Context ctx) throws IOException,
InterruptedException {
+      ctx.write(new LongWritable(vertex.getId()), new DoubleWritable(pageRanks.get(index.get())));
+    }
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Edge.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Edge.java?rev=1145213&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Edge.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Edge.java Mon Jul 11 15:48:16
2011
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.model;
+
+import com.google.common.collect.ComparisonChain;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/** A representation of a directed edge */
+public class Edge implements WritableComparable<Edge>, Cloneable {
+
+  private Vertex start;
+  private Vertex end;
+
+  public Edge() {}
+
+  public Edge(Vertex start, Vertex end) {
+    this.start = start;
+    this.end = end;
+  }
+
+  public Edge(long startVertexID, long endVertexID) {
+    this(new Vertex(startVertexID), new Vertex(endVertexID));
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    start.write(out);
+    end.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    start = new Vertex();
+    start.readFields(in);
+    end = new Vertex();
+    end.readFields(in);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof Edge) {
+      Edge other = (Edge) o;
+      return start.equals(other.start) && end.equals(other.end);
+    }
+    return false;
+  }
+
+  public Vertex startVertex() {
+    return start;
+  }
+
+  public Vertex endVertex() {
+    return end;
+  }
+
+  @Override
+  public int hashCode() {
+    return start.hashCode() + 31 * end.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "(" + start.getId() + "," + end.getId() + ")";
+  }
+
+  @Override
+  public int compareTo(Edge other) {
+    return ComparisonChain.start()
+        .compare(start, other.start)
+        .compare(end, other.end).result();
+  }
+
+  @Override
+  public Edge clone() {
+    return new Edge(start.clone(), end.clone());
+  }
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/graph/GraphTestCase.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/GraphTestCase.java?rev=1145213&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/graph/GraphTestCase.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/graph/GraphTestCase.java Mon Jul 11
15:48:16 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.mahout.common.MahoutTestCase;
+
+import java.io.File;
+import java.io.IOException;
+
+public abstract class GraphTestCase extends MahoutTestCase {
+
+  protected <T extends WritableComparable> void writeComponents(File destination, Configuration
conf,
+      Class<T> componentClass, T... components) throws IOException {
+    Path path = new Path(destination.getAbsolutePath());
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, componentClass,
NullWritable.class);
+    try {
+      for (T component : components) {
+        writer.append(component, NullWritable.get());
+      }
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+  }
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java?rev=1145213&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java
(added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java
Mon Jul 11 15:48:16 2011
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.linkanalysis;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.graph.GraphTestCase;
+import org.apache.mahout.graph.common.GraphUtils;
+import org.apache.mahout.graph.model.Edge;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.hadoop.MathHelper;
+import org.apache.mahout.math.map.OpenLongIntHashMap;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Map;
+
+/** example from "Mining Massive Datasets" */
+public class PageRankJobTest extends GraphTestCase {
+
+  @Test
+  public void indexAndCountDegree() throws Exception {
+
+    Mapper.Context ctx = EasyMock.createMock(Mapper.Context.class);
+
+    ctx.write(new IntWritable(7), new IntWritable(1));
+
+    EasyMock.replay(ctx);
+
+    OpenLongIntHashMap index = new OpenLongIntHashMap();
+    index.put(123L, 7);
+    PageRankJob.IndexAndCountDegreeMapper indexAndCountDegreeMapper = new PageRankJob.IndexAndCountDegreeMapper();
+    setField(indexAndCountDegreeMapper, "vertexIDsToIndex", index);
+    indexAndCountDegreeMapper.map(new Edge(123L, 456L), NullWritable.get(), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void toyIntegrationTest() throws Exception {
+
+    File verticesFile = getTestTempFile("vertices.txt");
+    File edgesFile = getTestTempFile("edges.seq");
+    File indexedVerticesFile = getTestTempFile("indexedVertices.seq");
+    File outputDir = getTestTempDir("output");
+    outputDir.delete();
+    File tempDir = getTestTempDir();
+
+    Configuration conf = new Configuration();
+
+    writeLines(verticesFile, "12", "34", "56", "78");
+
+    writeComponents(edgesFile, conf, Edge.class,
+        new Edge(12, 34),
+        new Edge(12, 56),
+        new Edge(12, 78),
+        new Edge(34, 12),
+        new Edge(34, 78),
+        new Edge(56, 56),
+        new Edge(78, 34),
+        new Edge(78, 56));
+
+    int numVertices = GraphUtils.indexVertices(conf, new Path(verticesFile.getAbsolutePath()),
+        new Path(indexedVerticesFile.getAbsolutePath()));
+
+    PageRankJob pageRank = new PageRankJob();
+    pageRank.setConf(conf);
+    pageRank.run(new String[] { "--vertexIndex", indexedVerticesFile.getAbsolutePath(),
+        "--edges", edgesFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+        "--numVertices", String.valueOf(numVertices), "--numIterations", String.valueOf(3),
+        "--teleportationProbability", String.valueOf(0.8), "--tempDir", tempDir.getAbsolutePath()
});
+
+    DenseMatrix expectedTransitionMatrix = new DenseMatrix(new double[][]{
+        { 0,           0.4, 0,   0 },
+        { 0.266666667, 0,   0,   0.4 },
+        { 0.266666667, 0,   0.8, 0.4 },
+        { 0.266666667, 0.4, 0,   0 } });
+
+    Matrix actualTransitionMatrix = MathHelper.readEntries(conf, new Path(tempDir.getAbsolutePath(),
+        "transitionMatrix/part-r-00000"), numVertices, numVertices);
+
+    assertEquals(expectedTransitionMatrix, actualTransitionMatrix);
+
+    Map<Long,Double> rankPerVertex = Maps.newHashMap();
+    for (String line : new FileLineIterable(new File(outputDir, "part-r-00000"))) {
+      String[] tokens = Iterables.toArray(Splitter.on("\t").split(line), String.class);
+      rankPerVertex.put(Long.parseLong(tokens[0]), Double.parseDouble(tokens[1]));
+    }
+
+    assertEquals(4, rankPerVertex.size());
+    assertEquals(rankPerVertex.get(12l), 0.1206666, EPSILON);
+    assertEquals(rankPerVertex.get(34L), 0.1571111, EPSILON);
+    assertEquals(rankPerVertex.get(56L), 0.5651111, EPSILON);
+    assertEquals(rankPerVertex.get(78L), 0.1571111, EPSILON);
+
+  }
+
+  void assertEquals(Matrix expected, Matrix actual) {
+    assertEquals(expected.numRows(), actual.numRows());
+    assertEquals(actual.numCols(), actual.numCols());
+    for (int row = 0; row < expected.numRows(); row++) {
+      for (int col = 0; col < expected.numCols(); col ++) {
+        assertEquals("Non-matching values in [" + row + "," + col + "]",
+            expected.get(row, col), actual.get(row, col), EPSILON);
+      }
+    }
+  }
+
+}

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java?rev=1145213&r1=1145212&r2=1145213&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java Mon Jul
11 15:48:16 2011
@@ -43,8 +43,7 @@ import org.easymock.EasyMock;
  */
 public final class MathHelper {
 
-  private MathHelper() {
-  }
+  private MathHelper() {}
 
   /**
    * applies an {@link IArgumentMatcher} to {@link MatrixEntryWritable}s

Modified: mahout/trunk/math/src/main/java/org/apache/mahout/math/AbstractVector.java
URL: http://svn.apache.org/viewvc/mahout/trunk/math/src/main/java/org/apache/mahout/math/AbstractVector.java?rev=1145213&r1=1145212&r2=1145213&view=diff
==============================================================================
--- mahout/trunk/math/src/main/java/org/apache/mahout/math/AbstractVector.java (original)
+++ mahout/trunk/math/src/main/java/org/apache/mahout/math/AbstractVector.java Mon Jul 11
15:48:16 2011
@@ -601,6 +601,8 @@ public abstract class AbstractVector imp
     }
     if (result.length() > 1) {
       result.setCharAt(result.length() - 1, '}');
+    } else {
+      result.append('}');
     }
     return result.toString();
   }



Mime
View raw message