mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1138850 - in /mahout/trunk/core/src: main/java/org/apache/mahout/graph/model/ test/java/org/apache/mahout/graph/common/
Date Thu, 23 Jun 2011 12:26:47 GMT
Author: ssc
Date: Thu Jun 23 12:26:46 2011
New Revision: 1138850

URL: http://svn.apache.org/viewvc?rev=1138850&view=rev
Log:
MAHOUT-741 MapReduce job to compute the local clustering coefficient in an undirected graph

Added:
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJobTest.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java?rev=1138850&r1=1138849&r2=1138850&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java Thu Jun 23
12:26:46 2011
@@ -46,6 +46,18 @@ public class Triangle implements Writabl
     this(new Vertex(firstVertexId), new Vertex(secondVertexId), new Vertex(thirdVertexId));
   }
 
+  public Vertex getFirstVertex() {
+    return first;
+  }
+
+  public Vertex getSecondVertex() {
+    return second;
+  }
+
+  public Vertex getThirdVertex() {
+    return third;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     first.write(out);

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java?rev=1138850&r1=1138849&r2=1138850&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
(original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
Thu Jun 23 12:26:46 2011
@@ -36,7 +36,7 @@ import java.util.Map;
 
 public class DegreeDistributionJobTest extends MahoutTestCase {
 
-@Test
+  @Test
   public void toyIntegrationTest() throws Exception {
 
     File inputFile = getTestTempFile("edges.seq");
@@ -47,7 +47,7 @@ public class DegreeDistributionJobTest e
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);
 
-    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(inputFile.getAbsolutePath().toString()),
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(inputFile.getAbsolutePath()),
         UndirectedEdge.class, NullWritable.class);
 
     try {

Added: mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java?rev=1138850&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
(added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
Thu Jun 23 12:26:46 2011
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.graph.model.Triangle;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.apache.mahout.graph.model.Vertex;
+import org.apache.mahout.math.Varint;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * <p>Distributed computation of the local clustering coefficient of the vertices of
an undirected graph. The local clustering coefficient is a
+ * measure for the "connectedness" of a vertex in its neighborhood and is computed by dividing
the number of closed triangles with a vertex'
+ * neighbors by the number of possible triangles of this vertex with it's neighbours.</p>
+ *
+ * <p>The input files needs to be  {@link org.apache.hadoop.io.SequenceFile}s, one
with {@link UndirectedEdge}s as keys and
+ * any Writable as values, as it is already produced by {@link SimplifyGraphJob}, the other
with {@link Triangle}s as keys and any Writable as
+ * values, as it is already produced by {@link org.apache.mahout.graph.triangles.EnumerateTrianglesJob}</p>
+ *
+ * <p>This job outputs text files with a vertex id and it local clustering coefficient
per line.</p>
+ *
+ * <p>Command line arguments specific to this class are:</p>
+ *
+ * <ol>
+ * <li>--edges=(path): Directory containing one or more sequence files with edge data</li>
+ * <li>--triangles=(path): Directory containing one or more sequence files with triangle
data</li>
+ * <li>--Dmapred.output.dir=(path): output path where the degree distribution data
should be written</li>
+ * </ol>
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear
before all other arguments.</p>
+ */
+public class LocalClusteringCoefficientJob extends AbstractJob {
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new LocalClusteringCoefficientJob(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addOption("edges", "e", "path to the edges of the input graph", true);
+    addOption("triangles", "t", "path to the triangles of the input graph", true);
+    addOutputOption();
+
+    Map<String, String> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    Path edgesPath = new Path(parsedArgs.get("--edges"));
+    Path trianglesPath = new Path(parsedArgs.get("--triangles"));
+
+    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
+
+    Path edgesPerVertex = new Path(tempDirPath, "edgesPerVertex");
+    Path trianglesPerVertex = new Path(tempDirPath, "trianglesPerVertex");
+
+    // unfortunately we don't have access to an undeprecated MultipleInputs, so we need several
M/R steps instead of one...
+    Job countEdgesPerVertex = prepareJob(edgesPath, edgesPerVertex, SequenceFileInputFormat.class,
+        EdgeCountMapper.class, Vertex.class, TriangleOrEdgeCount.class, Reducer.class, Vertex.class,
+        TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
+    countEdgesPerVertex.setCombinerClass(TriangleOrEdgeCountCombiner.class);
+    countEdgesPerVertex.waitForCompletion(true);
+
+    Job countTrianglesPerVertex = prepareJob(trianglesPath, trianglesPerVertex, SequenceFileInputFormat.class,
+        TriangleCountMapper.class, Vertex.class, TriangleOrEdgeCount.class, Reducer.class,
Vertex.class,
+        TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
+    countTrianglesPerVertex.setCombinerClass(TriangleOrEdgeCountCombiner.class);
+    countTrianglesPerVertex.waitForCompletion(true);
+
+    Job computeLocalClusteringCoefficient = prepareJob(new Path(edgesPerVertex + "," + trianglesPerVertex),
+        getOutputPath(), SequenceFileInputFormat.class, Mapper.class, Vertex.class, TriangleOrEdgeCount.class,
+        LocalClusteringCoefficientReducer.class, LongWritable.class, DoubleWritable.class,
TextOutputFormat.class);
+    computeLocalClusteringCoefficient.setCombinerClass(TriangleOrEdgeCountCombiner.class);
+    computeLocalClusteringCoefficient.waitForCompletion(true);
+
+    return 0;
+  }
+
+  static class EdgeCountMapper extends Mapper<UndirectedEdge,Writable,Vertex,TriangleOrEdgeCount>
{
+
+    private static final TriangleOrEdgeCount ONE_EDGE = new TriangleOrEdgeCount(1, false);
+
+    @Override
+    protected void map(UndirectedEdge edge, Writable value, Context ctx) throws IOException,
InterruptedException {
+      ctx.write(edge.getFirstVertex(), ONE_EDGE);
+      ctx.write(edge.getSecondVertex(), ONE_EDGE);
+    }
+  }
+
+  static class TriangleCountMapper extends Mapper<Triangle,Writable,Vertex,TriangleOrEdgeCount>
{
+
+    private static final TriangleOrEdgeCount ONE_TRIANGLE = new TriangleOrEdgeCount(1, true);
+
+    @Override
+    protected void map(Triangle triangle, Writable value, Context ctx) throws IOException,
InterruptedException {
+      ctx.write(triangle.getFirstVertex(), ONE_TRIANGLE);
+      ctx.write(triangle.getSecondVertex(), ONE_TRIANGLE);
+      ctx.write(triangle.getThirdVertex(), ONE_TRIANGLE);
+    }
+  }
+
+  static class LocalClusteringCoefficientReducer
+      extends Reducer<Vertex,TriangleOrEdgeCount,LongWritable,DoubleWritable> {
+    @Override
+    protected void reduce(Vertex vertex, Iterable<TriangleOrEdgeCount> counts, Context
ctx)
+        throws IOException, InterruptedException {
+      int numEdges = 0;
+      int numTriangles = 0;
+
+      for (TriangleOrEdgeCount count : counts) {
+        if (count.isTriangles()) {
+          numTriangles += count.get();
+        } else {
+          numEdges += count.get();
+        }
+      }
+
+      double localClusteringCoefficient = numEdges > 1 ?
+          (double) numTriangles / (double) (numEdges * (numEdges - 1)) : 0d;
+
+      ctx.write(new LongWritable(vertex.getId()), new DoubleWritable(localClusteringCoefficient));
+    }
+  }
+
+  static class TriangleOrEdgeCountCombiner extends Reducer<Vertex,TriangleOrEdgeCount,Vertex,TriangleOrEdgeCount>
{
+
+    @Override
+    protected void reduce(Vertex vertex, Iterable<TriangleOrEdgeCount> counts, Context
ctx)
+        throws IOException, InterruptedException {
+      int numEdges = 0;
+      int numTriangles = 0;
+
+      for (TriangleOrEdgeCount count : counts) {
+        if (count.isTriangles()) {
+          numTriangles += count.get();
+        } else {
+          numEdges += count.get();
+        }
+      }
+
+      if (numEdges > 0) {
+        ctx.write(vertex, new TriangleOrEdgeCount(numEdges, false));
+      }
+      if (numTriangles > 0) {
+        ctx.write(vertex, new TriangleOrEdgeCount(numTriangles, true));
+      }
+    }
+  }
+
+
+  static class TriangleOrEdgeCount implements Writable {
+
+    private int count;
+    private boolean isTriangles;
+
+    TriangleOrEdgeCount() {}
+
+    public int get() {
+      return count;
+    }
+
+    public boolean isTriangles() {
+      return isTriangles;
+    }
+
+    TriangleOrEdgeCount(int count, boolean isTriangle) {
+      this.count = count;
+      this.isTriangles = isTriangle;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      Varint.writeUnsignedVarInt(count, out);
+      out.writeBoolean(isTriangles);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      count = Varint.readUnsignedVarInt(in);
+      isTriangles = in.readBoolean();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof TriangleOrEdgeCount) {
+        TriangleOrEdgeCount other = (TriangleOrEdgeCount) o;
+        return count == other.count && isTriangles == other.isTriangles;
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return 31 * count + (isTriangles ? 1 : 0);
+    }
+  }
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJobTest.java?rev=1138850&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJobTest.java
(added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJobTest.java
Thu Jun 23 12:26:46 2011
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.graph.model.Triangle;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Map;
+
+public class LocalClusteringCoefficientJobTest extends MahoutTestCase {
+
+    @Test
+    public void toyIntegrationTest() throws Exception {
+
+      File edgesFile = getTestTempFile("edges.seq");
+      File trianglesFile = getTestTempFile("triangles.seq");
+      File outputDir = getTestTempDir("output");
+      outputDir.delete();
+      File tempDir = getTestTempDir("tmp");
+
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.get(conf);
+
+      SequenceFile.Writer edgesWriter = new SequenceFile.Writer(fs, conf, new Path(edgesFile.getAbsolutePath()),
+          UndirectedEdge.class, NullWritable.class);
+      try {
+        edgesWriter.append(new UndirectedEdge(0, 1), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(0, 2), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(0, 3), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(0, 4), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(0, 5), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(0, 6), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(0, 7), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(1, 2), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(1, 3), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(2, 3), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(4, 5), NullWritable.get());
+        edgesWriter.append(new UndirectedEdge(4, 7), NullWritable.get());
+      } finally {
+        Closeables.closeQuietly(edgesWriter);
+      }
+
+      SequenceFile.Writer trianglesWriter = new SequenceFile.Writer(fs, conf, new Path(trianglesFile.getAbsolutePath()),
+          Triangle.class, NullWritable.class);
+      try {
+        trianglesWriter.append(new Triangle(0, 1, 2), NullWritable.get());
+        trianglesWriter.append(new Triangle(0, 1, 3), NullWritable.get());
+        trianglesWriter.append(new Triangle(0, 2, 3), NullWritable.get());
+        trianglesWriter.append(new Triangle(0, 4, 5), NullWritable.get());
+        trianglesWriter.append(new Triangle(0, 4, 7), NullWritable.get());
+        trianglesWriter.append(new Triangle(1, 2, 3), NullWritable.get());
+      } finally {
+        Closeables.closeQuietly(trianglesWriter);
+      }
+
+      LocalClusteringCoefficientJob clusteringCoefficientJob = new LocalClusteringCoefficientJob();
+      clusteringCoefficientJob.setConf(conf);
+      clusteringCoefficientJob.run(new String[] { "--edges", edgesFile.getAbsolutePath(),
+          "--triangles", trianglesFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+          "--tempDir", tempDir.getAbsolutePath() });
+
+      Map<Long,Double> localClusteringCoefficients = Maps.newHashMap();
+      for (String line : new FileLineIterable(new File(outputDir, "part-r-00000"))) {
+        String[] tokens = Iterables.toArray(Splitter.on("\t").split(line), String.class);
+        localClusteringCoefficients.put(Long.parseLong(tokens[0]), Double.parseDouble(tokens[1]));
+      }
+
+      assertEquals(8, localClusteringCoefficients.size());
+      assertEquals(0.119047, localClusteringCoefficients.get(0L), EPSILON);
+      assertEquals(0.5, localClusteringCoefficients.get(1L), EPSILON);
+      assertEquals(0.5, localClusteringCoefficients.get(2L), EPSILON);
+      assertEquals(0.5, localClusteringCoefficients.get(3L), EPSILON);
+      assertEquals(0.333333, localClusteringCoefficients.get(4L), EPSILON);
+      assertEquals(0.5, localClusteringCoefficients.get(5L), EPSILON);
+      assertEquals(0, localClusteringCoefficients.get(6L), EPSILON);
+      assertEquals(0.5, localClusteringCoefficients.get(7L), EPSILON);
+    }
+}



Mime
View raw message