mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1137447 - in /mahout/trunk/core: ./ src/main/java/org/apache/mahout/graph/ src/main/java/org/apache/mahout/graph/common/ src/main/java/org/apache/mahout/graph/model/ src/main/java/org/apache/mahout/graph/triangles/ src/test/java/org/apache...
Date Sun, 19 Jun 2011 21:19:25 GMT
Author: ssc
Date: Sun Jun 19 21:19:24 2011
New Revision: 1137447

URL: http://svn.apache.org/viewvc?rev=1137447&view=rev
Log:
MAHOUT-710 graph primitives and triangle enumeration code

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/package-info.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/UndirectedEdge.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/UndirectedEdgeWithDegrees.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Vertex.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/VertexWithDegree.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/package-info.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/package-info.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/JoinableUndirectedEdge.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/VertexOrMarker.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/package-info.java
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJobTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/SimplifyGraphJobTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/triangles/
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJobTest.java
Modified:
    mahout/trunk/core/   (props changed)
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/impl/TasteTestCase.java
    mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java

Propchange: mahout/trunk/core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Sun Jun 19 21:19:24 2011
@@ -1,14 +1,13 @@
 .pmd
-.classpath
 test
-*.iml
-testdatatestdata
 .project
 .ruleset
-.checkstyle
-dist
-.settings
 build
+.settings
 input
+.classpath
+*.iml
+testdatatestdata
 target
-testdata
+.checkstyle
+dist

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.apache.mahout.graph.model.UndirectedEdgeWithDegrees;
+import org.apache.mahout.graph.model.Vertex;
+import org.apache.mahout.graph.model.VertexWithDegree;
+
+/**
+ * Augments a graph with degree information for each vertex which is the number
+ * of {@link org.apache.mahout.graph.model.UndirectedEdge}s that point to or from this very vertex.
+ */
+public class AugmentGraphWithDegreesJob extends AbstractJob {
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new AugmentGraphWithDegreesJob(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+
+    Map<String, String> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
+
+    Path inputPath = getInputPath();
+    Path augmentedEdgesPath = new Path(tempDirPath, "augmented-edges");
+    Path outputPath = getOutputPath();
+
+    // scatter the edges to each of the vertices and count degree
+    Job scatter = prepareJob(inputPath, augmentedEdgesPath, SequenceFileInputFormat.class, ScatterEdgesMapper.class,
+        Vertex.class, Vertex.class, SumDegreesReducer.class, UndirectedEdge.class, VertexWithDegree.class,
+        SequenceFileOutputFormat.class);
+    scatter.waitForCompletion(true);
+
+    // join augmented edges with partial degree information to to complete records
+    Job join = prepareJob(augmentedEdgesPath, outputPath, SequenceFileInputFormat.class, Mapper.class,
+        UndirectedEdge.class, VertexWithDegree.class, JoinDegreesReducer.class, UndirectedEdgeWithDegrees.class,
+        NullWritable.class, SequenceFileOutputFormat.class);
+    join.waitForCompletion(true);
+
+    return 0;
+  }
+
+  /** Sends every edge to each vertex  */
+  public static class ScatterEdgesMapper extends Mapper<UndirectedEdge,Object,Vertex,Vertex> {
+    @Override
+    protected void map(UndirectedEdge edge, Object value, Context ctx) throws IOException, InterruptedException {
+      ctx.write(edge.getFirstVertex(), edge.getSecondVertex());
+      ctx.write(edge.getSecondVertex(), edge.getFirstVertex());
+    }
+  }
+
+  /** Sums up the count of edges for each vertex and augments all edges with a degree information for the key vertex */
+  public static class SumDegreesReducer extends Reducer<Vertex, Vertex, UndirectedEdge, VertexWithDegree> {
+    @Override
+    protected void reduce(Vertex vertex, Iterable<Vertex> connectedVertices, Context ctx)
+        throws IOException, InterruptedException {
+      FastIDSet connectedVertexIds = new FastIDSet();
+      for (Vertex connectedVertex : connectedVertices) {
+        connectedVertexIds.add(connectedVertex.getId());
+      }
+
+      int degree = connectedVertexIds.size();
+      VertexWithDegree vertexWithDegree = new VertexWithDegree(vertex, degree);
+      LongPrimitiveIterator connectedVertexIdsIterator = connectedVertexIds.iterator();
+      while (connectedVertexIdsIterator.hasNext()) {
+        Vertex connectedVertex = new Vertex(connectedVertexIdsIterator.nextLong());
+        ctx.write(new UndirectedEdge(vertex, connectedVertex), vertexWithDegree);
+      }
+    }
+  }
+
+  /** Joins identical edges assuring degree augmentations for both nodes */
+  public static class JoinDegreesReducer
+      extends Reducer<UndirectedEdge,VertexWithDegree,UndirectedEdgeWithDegrees,NullWritable> {
+    @Override
+    protected void reduce(UndirectedEdge edge, Iterable<VertexWithDegree> verticesWithDegrees, Context ctx)
+        throws IOException, InterruptedException {
+      Iterator<VertexWithDegree> iterator = verticesWithDegrees.iterator();
+      VertexWithDegree firstVertexWithDegree = iterator.next().clone();
+      VertexWithDegree secondVertexWithDegree = iterator.next().clone();
+      ctx.write(new UndirectedEdgeWithDegrees(firstVertexWithDegree, secondVertexWithDegree), NullWritable.get());
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.apache.mahout.graph.model.Vertex;
+
+/**
+ * Simplifies a graph. That is: remove loops, aggregate edges to {@link org.apache.mahout.graph.model.UndirectedEdge }. The input file
+ * format is a {@link TextInputFormat}
+ * 
+ * This job accepts two input arguments
+ * 
+ * <pre>
+ *  input
+ *  output
+ * </pre>
+ * 
+ * The output is a {@link SequenceFile} containing a {@link org.apache.mahout.graph.model.UndirectedEdge} as key
+ * and a {@link NullWritable} as value.
+ */
+public class SimplifyGraphJob extends AbstractJob {
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new SimplifyGraphJob(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+
+    Path inputPath = getInputPath();
+    Path outputPath = getOutputPath();
+
+    Job simplify = prepareJob(inputPath, outputPath, TextInputFormat.class, SimplifyGraphMapper.class,
+        UndirectedEdge.class, NullWritable.class, SimplifyGraphReducer.class, UndirectedEdge.class, NullWritable.class,
+        SequenceFileOutputFormat.class);
+    simplify.waitForCompletion(true);
+
+    return 0;
+  }
+
+  /** Bins edges by an ordered membership set. Scatters edges with at least two vertices in the membership set.*/
+  public static class SimplifyGraphMapper extends Mapper<Object, Text, UndirectedEdge, NullWritable> {
+
+    private static final Pattern SEPARATOR = Pattern.compile(",");
+    @Override
+    public void map(Object key, Text line, Context ctx) throws IOException, InterruptedException {
+      try {
+        String[] tokens = SEPARATOR.split(line.toString());
+        Pair<Vertex,Vertex> vertices = new Pair<Vertex,Vertex>(new Vertex(Long.parseLong(tokens[0])),
+            new Vertex(Long.parseLong(tokens[1])));
+        Vertex one = vertices.getFirst();
+        Vertex two = vertices.getSecond();
+        // remove loops and un-direct edges
+        if (!one.equals(two)) {
+          ctx.write(new UndirectedEdge(one, two), NullWritable.get());
+        }
+      } catch (NumberFormatException e) {
+        //ignore unparseable lines
+      }
+    }
+  }
+
+  public static class SimplifyGraphReducer extends Reducer<UndirectedEdge, NullWritable, UndirectedEdge, NullWritable> {
+    @Override
+    protected void reduce(UndirectedEdge edge, Iterable<NullWritable> values, Context ctx)
+        throws IOException, InterruptedException {
+      ctx.write(edge, NullWritable.get());
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/package-info.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/package-info.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/package-info.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/package-info.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,10 @@
+/**
+ * This package contains some common tasks for graph algorithms. It proposes a standard tool chain to process a graph:
+ * <ol>
+ * 	<li>Simplify the graph with {@link org.apache.mahout.graph.common.SimplifyGraphJob}. Parse a text file to a small representation of
+ * 	edges without loops and duplicate edges. After this step the graph is interpreted as an undirected graph.</li>
+ * 	<li>Augment the graph with vertex degrees using {@link AugmentGraphWithDegreesJob} which can be achieved in a  two-step MapReduce
+ * 		pipeline.</li>
+ * </ol>
+ */
+package org.apache.mahout.graph.common;

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.model;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class Triangle implements Writable {
+
+  private Vertex first;
+  private Vertex second;
+  private Vertex third;
+
+  public Triangle() {}
+
+  public Triangle(Vertex first, Vertex second, Vertex third) {
+    List<Vertex> vertices = Lists.newArrayList(first, second, third);
+    Collections.sort(vertices);
+    this.first = vertices.get(0);
+    this.second = vertices.get(1);
+    this.third = vertices.get(2);
+  }
+
+  public Triangle(long firstVertexId, long secondVertexId, long thirdVertexId) {
+    this(new Vertex(firstVertexId), new Vertex(secondVertexId), new Vertex(thirdVertexId));
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    first.write(out);
+    second.write(out);
+    third.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    first = new Vertex();
+    first.readFields(in);
+    second = new Vertex();
+    second.readFields(in);
+    third = new Vertex();
+    third.readFields(in);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof Triangle) {
+      Triangle other = (Triangle) o;
+      return first.equals(other.first) && second.equals(other.second) && third.equals(other.third);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 31 * first.hashCode() + second.hashCode();
+    return 31 * result + third.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "(" + first.getId() + "," + second.getId() + "," + third.getId() + ")";
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/UndirectedEdge.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/UndirectedEdge.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/UndirectedEdge.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/UndirectedEdge.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.google.common.collect.ComparisonChain;
+import org.apache.hadoop.io.WritableComparable;
+
+/** A representation of an undirected edge */
+public class UndirectedEdge implements WritableComparable<UndirectedEdge>, Cloneable {
+
+  private Vertex first;
+  private Vertex second;
+
+  public UndirectedEdge() {}
+
+  public UndirectedEdge(Vertex first, Vertex second) {
+    if (first.getId() < second.getId()) {
+      this.first = first;
+      this.second = second;
+    } else {
+      this.first = second;
+      this.second = first;
+    }
+  }
+
+  public UndirectedEdge(long firstVertexID, long secondVertexID) {
+    this(new Vertex(firstVertexID), new Vertex(secondVertexID));
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    first.write(out);
+    second.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    first = new Vertex();
+    first.readFields(in);
+    second = new Vertex();
+    second.readFields(in);
+  }
+
+  /** Returns true if the other instance is an edge containing the same vertices */
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof UndirectedEdge) {
+      UndirectedEdge other = (UndirectedEdge) o;
+      if (first.equals(other.first) && second.equals(other.second)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public Vertex getFirstVertex() {
+    return first;
+  }
+
+  public Vertex getSecondVertex() {
+    return second;
+  }
+
+  @Override
+  public int hashCode() {
+    return first.hashCode() + 31 * second.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "(" + first.getId() + "," + second.getId() + ")";
+  }
+
+  @Override
+  public int compareTo(UndirectedEdge other) {
+    return ComparisonChain.start()
+        .compare(first, other.first)
+        .compare(second, other.second).result();
+  }
+
+  @Override
+  public UndirectedEdge clone() {
+    return new UndirectedEdge(first.clone(), second.clone());
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/UndirectedEdgeWithDegrees.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/UndirectedEdgeWithDegrees.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/UndirectedEdgeWithDegrees.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/UndirectedEdgeWithDegrees.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.model;
+
+import com.google.common.collect.ComparisonChain;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class UndirectedEdgeWithDegrees implements WritableComparable<UndirectedEdgeWithDegrees> {
+
+  private VertexWithDegree first;
+  private VertexWithDegree second;
+
+  public UndirectedEdgeWithDegrees() {}
+
+  public UndirectedEdgeWithDegrees(VertexWithDegree first, VertexWithDegree second) {
+    if (first.getVertex().compareTo(second.getVertex()) < 0) {
+      this.first = first;
+      this.second = second;
+    } else {
+      this.first = second;
+      this.second = first;
+    }
+  }
+
+  public UndirectedEdgeWithDegrees(long firstVertexId, int firstVertexDegree, long secondVertexId,
+      int secondVertexDegree) {
+    this(new VertexWithDegree(firstVertexId, firstVertexDegree), new VertexWithDegree(secondVertexId,
+        secondVertexDegree));
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    first.write(out);
+    second.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    first = new VertexWithDegree();
+    first.readFields(in);
+    second = new VertexWithDegree();
+    second.readFields(in);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof UndirectedEdgeWithDegrees) {
+      UndirectedEdgeWithDegrees other = (UndirectedEdgeWithDegrees) o;
+      if (first.equals(other.first) && second.equals(other.second)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public VertexWithDegree getFirstVertexWithDegree() {
+    return first;
+  }
+
+  public VertexWithDegree getSecondVertexWithDegree() {
+    return second;
+  }
+
+  @Override
+  public int hashCode() {
+    return first.hashCode() + 31 * second.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "(" + first + ", " + second + ")";
+  }
+
+  @Override
+  public int compareTo(UndirectedEdgeWithDegrees other) {
+    return ComparisonChain.start()
+        .compare(first.getVertex(), other.first.getVertex())
+        .compare(second.getVertex(), other.second.getVertex()).result();
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Vertex.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Vertex.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Vertex.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Vertex.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.io.WritableComparable;
+
+/** models a vertex in a graph */
+public class Vertex implements WritableComparable<Vertex>, Cloneable {
+
+  private long id;
+
+  public Vertex() {}
+
+  public Vertex(long id) {
+    this.id = id;
+  }
+
+  public static Vertex read(DataInput in) throws IOException {
+    Vertex v = new Vertex();
+    v.readFields(in);
+    return v;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.id = in.readLong();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(id);
+  }
+
+  public long getId() {
+    return this.id;
+  }
+
+  /** Compares this instance to another according to the <code>id</code> attribute. */
+  @Override
+  public int compareTo(Vertex other) {
+    return Longs.compare(id, other.id);
+  }
+
+  /** Compares this instance to another according to the <code>id</code> attribute */
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof Vertex) {
+      return ((Vertex) other).id == id;
+    }
+    return false;
+  }
+
+  @Override
+  public Vertex clone() {
+    return new Vertex(id);
+  }
+
+  /**
+   * The hash code the <code>id</code> attribute
+   */
+  @Override
+  public int hashCode() {
+    return Longs.hashCode(id);
+  }
+  
+  @Override
+  public String toString() {
+    return "(" + id + ")";
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/VertexWithDegree.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/VertexWithDegree.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/VertexWithDegree.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/VertexWithDegree.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.model;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Varint;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class VertexWithDegree implements Writable, Cloneable {
+
+  private Vertex vertex;
+  private int degree;
+
+  public VertexWithDegree() {}
+
+  public VertexWithDegree(Vertex vertex, int degree) {
+    this.vertex = vertex;
+    this.degree = degree;
+  }
+
+  public VertexWithDegree(long vertexId, int degree) {
+    this(new Vertex(vertexId), degree);
+  }
+
+  public Vertex getVertex() {
+    return vertex;
+  }
+
+  public int getDegree() {
+    return degree;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    vertex.write(out);
+    Varint.writeUnsignedVarInt(degree, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    vertex = new Vertex();
+    vertex.readFields(in);
+    degree = Varint.readUnsignedVarInt(in);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof VertexWithDegree) {
+      return vertex.equals(((VertexWithDegree) o).vertex);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return vertex.hashCode();
+  }
+
+  @Override
+  public VertexWithDegree clone() {
+    return new VertexWithDegree(vertex.clone(), degree);
+  }
+
+  @Override
+  public String toString() {
+    return "(" + vertex + "," + degree + ")";
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/package-info.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/package-info.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/package-info.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/package-info.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,2 @@
+/** Models for common graph elements */
+package org.apache.mahout.graph.model;

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/package-info.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/package-info.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/package-info.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/package-info.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,9 @@
+/**
+ * This package brings graph algorithms to<em>Mahout</em>.
+ * <ol>
+ * 	<li><code>org.apache.mahout.graph.common</code> contains chainable tools to prepare arbitrary graphs</li>
+ * 	<li><code>org.apache.mahout.graph.model</code> models several graph elements</li>
+ * 	<li><code>org.apache.mahout.graph.triangles</code> contains triangle enumeration code</li>
+ * </ol>
+ */
+package org.apache.mahout.graph;

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.triangles;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.graph.model.Triangle;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.apache.mahout.graph.model.UndirectedEdgeWithDegrees;
+import org.apache.mahout.graph.model.Vertex;
+import org.apache.mahout.graph.model.VertexWithDegree;
+
+/** Enumerates all triangles of an undirected graph. */
+public class EnumerateTrianglesJob extends AbstractJob {
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new EnumerateTrianglesJob(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+
+    Map<String, String> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
+
+    Path inputPath = getInputPath();
+    Path joinableInputPath = new Path(tempDirPath, "joinableInput");
+    Path triadsPath = new Path(tempDirPath, "triangles");
+    Path outputPath = getOutputPath();
+
+    // scatter the edges to lower degree vertex and build open triads
+    Job scatter = prepareJob(inputPath, triadsPath, SequenceFileInputFormat.class,
+        ScatterEdgesToLowerDegreeVertexMapper.class, Vertex.class, Vertex.class,
+        BuildOpenTriadsReducer.class, JoinableUndirectedEdge.class, VertexOrMarker.class,
+        SequenceFileOutputFormat.class);
+    scatter.waitForCompletion(true);
+
+    // necessary as long as we don't have access to an undeprecated MultipleInputs
+    Job prepareInput = prepareJob(inputPath, joinableInputPath, SequenceFileInputFormat.class, PrepareInputMapper.class,
+        JoinableUndirectedEdge.class, VertexOrMarker.class, Reducer.class, JoinableUndirectedEdge.class,
+        VertexOrMarker.class, SequenceFileOutputFormat.class);
+    prepareInput.setGroupingComparatorClass(JoinableUndirectedEdge.GroupingComparator.class);
+    prepareInput.waitForCompletion(true);
+
+    //join opentriads and edges pairwise to get all triangles
+    Job joinTriads = prepareJob(new Path(triadsPath + "," + joinableInputPath), outputPath,
+        SequenceFileInputFormat.class, Mapper.class, JoinableUndirectedEdge.class, VertexOrMarker.class,
+        JoinTrianglesReducer.class, Triangle.class, NullWritable.class, SequenceFileOutputFormat.class);
+    joinTriads.setGroupingComparatorClass(JoinableUndirectedEdge.GroupingComparator.class);
+    joinTriads.waitForCompletion(true);
+
+    return 0;
+  }
+
+  /** Finds the lower degree vertex of an edge and emits key-value-pairs to bin under this lower degree vertex. */
+  public static class ScatterEdgesToLowerDegreeVertexMapper extends
+      Mapper<UndirectedEdgeWithDegrees,Object,Vertex,Vertex> {
+    @Override
+    protected void map(UndirectedEdgeWithDegrees edge, Object value, Context ctx)
+        throws IOException, InterruptedException {
+      VertexWithDegree first = edge.getFirstVertexWithDegree();
+      VertexWithDegree second = edge.getSecondVertexWithDegree();
+
+      if (first.getDegree() < second.getDegree()) {
+        ctx.write(first.getVertex(), second.getVertex());
+      } else {
+        ctx.write(second.getVertex(), first.getVertex());
+      }
+    }
+  }
+
+  /**
+   * Builds open triads from edges by pairwise joining the edges on the lower degree vertex which is the apex of the triad. Emits key-value pairs
+   * where the value is the triad and the key is the two outside vertices.
+   */
+  public static class BuildOpenTriadsReducer extends Reducer<Vertex,Vertex,JoinableUndirectedEdge,VertexOrMarker> {
+    @Override
+    protected void reduce(Vertex vertex, Iterable<Vertex> vertices, Context ctx)
+        throws IOException, InterruptedException {
+      FastIDSet bufferedVertexIDs = new FastIDSet();
+      for (Vertex firstVertexOfMissingEdge : vertices) {
+        LongPrimitiveIterator bufferedVertexIdsIterator = bufferedVertexIDs.iterator();
+        while (bufferedVertexIdsIterator.hasNext()) {
+          Vertex secondVertexOfMissingEdge = new Vertex(bufferedVertexIdsIterator.nextLong());
+          UndirectedEdge missingEdge = new UndirectedEdge(firstVertexOfMissingEdge, secondVertexOfMissingEdge);
+          System.out.println(new JoinableUndirectedEdge(missingEdge, false) + " " + new VertexOrMarker(vertex));
+          ctx.write(new JoinableUndirectedEdge(missingEdge, false), new VertexOrMarker(vertex));
+        }
+        bufferedVertexIDs.add(firstVertexOfMissingEdge.getId());
+      }
+    }
+  }
+
+  public static class PrepareInputMapper
+      extends Mapper<UndirectedEdgeWithDegrees,Object,JoinableUndirectedEdge,VertexOrMarker> {
+    @Override
+    protected void map(UndirectedEdgeWithDegrees edgeWithDegrees, Object value, Context ctx)
+        throws IOException, InterruptedException {
+      Vertex firstVertex = edgeWithDegrees.getFirstVertexWithDegree().getVertex();
+      Vertex secondVertex = edgeWithDegrees.getSecondVertexWithDegree().getVertex();
+      ctx.write(new JoinableUndirectedEdge(firstVertex, secondVertex, true), VertexOrMarker.MARKER);
+    }
+  }
+
+  public static class JoinTrianglesReducer
+      extends Reducer<JoinableUndirectedEdge,VertexOrMarker,Triangle,NullWritable> {
+    @Override
+    protected void reduce(JoinableUndirectedEdge joinableEdge, Iterable<VertexOrMarker> verticesAndMarker,
+        Context ctx) throws IOException, InterruptedException {
+      Iterator<VertexOrMarker> verticesAndMarkerIterator = verticesAndMarker.iterator();
+      VertexOrMarker possibleMarker = verticesAndMarkerIterator.next();
+      if (possibleMarker.isMarker()) {
+        UndirectedEdge edge = joinableEdge.getEdge();
+        while (verticesAndMarkerIterator.hasNext()) {
+          Vertex connectingVertex = verticesAndMarkerIterator.next().getVertex();
+          ctx.write(new Triangle(connectingVertex, edge.getFirstVertex(), edge.getSecondVertex()), NullWritable.get());
+        }
+      }
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/JoinableUndirectedEdge.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/JoinableUndirectedEdge.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/JoinableUndirectedEdge.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/JoinableUndirectedEdge.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.triangles;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.apache.mahout.graph.model.Vertex;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+public class JoinableUndirectedEdge implements WritableComparable<JoinableUndirectedEdge> {
+
+  private UndirectedEdge edge;
+  private boolean marked;
+
+  static {
+    WritableComparator.define(JoinableUndirectedEdge.class, new SecondarySortComparator());
+  }
+
+  public JoinableUndirectedEdge() {}
+
+  public JoinableUndirectedEdge(UndirectedEdge edge, boolean marked) {
+    this.edge = edge;
+    this.marked = marked;
+  }
+
+  public JoinableUndirectedEdge(Vertex firstVertex, Vertex secondVertex, boolean marked) {
+    this(new UndirectedEdge(firstVertex, secondVertex), marked);
+  }
+
+  public JoinableUndirectedEdge(long firstVertexId, long secondVertexId, boolean marked) {
+    this(new UndirectedEdge(firstVertexId, secondVertexId), marked);
+  }
+
+  public UndirectedEdge getEdge() {
+    return edge;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    edge.write(out);
+    out.writeBoolean(marked);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    edge = new UndirectedEdge();
+    edge.readFields(in);
+    marked = in.readBoolean();
+  }
+
+  @Override
+  public int compareTo(JoinableUndirectedEdge other) {
+    return edge.compareTo(other.edge);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof JoinableUndirectedEdge) {
+      return edge.equals(((JoinableUndirectedEdge) o).edge);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return edge.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "(" + edge + "," +marked + ")";
+  }
+
+  public static class SecondarySortComparator extends WritableComparator implements Serializable {
+
+    protected SecondarySortComparator() {
+      super(JoinableUndirectedEdge.class, true);
+    }
+
+    @Override
+    public int compare(WritableComparable a, WritableComparable b) {
+      JoinableUndirectedEdge first = (JoinableUndirectedEdge) a;
+      JoinableUndirectedEdge second = (JoinableUndirectedEdge) b;
+
+      int result = first.edge.compareTo(second.edge);
+      if (result == 0) {
+        if (first.marked && !second.marked) {
+          return -1;
+        } else if (!first.marked && second.marked) {
+          return 1;
+        }
+      }
+      return result;
+    }
+
+  }
+
+  public static class GroupingComparator extends WritableComparator implements Serializable {
+    protected GroupingComparator() {
+      super(JoinableUndirectedEdge.class, true);
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/VertexOrMarker.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/VertexOrMarker.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/VertexOrMarker.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/VertexOrMarker.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.triangles;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.graph.model.Vertex;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class VertexOrMarker implements Writable {
+
+  private boolean marker;
+  private Vertex connectingVertex;
+
+  public static final VertexOrMarker MARKER = new VertexOrMarker(true);
+
+  public VertexOrMarker() {}
+
+  public VertexOrMarker(Vertex vertex) {
+    connectingVertex = vertex;
+    marker = false;
+  }
+
+  public VertexOrMarker(long vertexId) {
+    this(new Vertex(vertexId));
+  }
+
+  private VertexOrMarker(boolean marker) {
+    this.marker = true;
+  }
+
+  public boolean isMarker() {
+    return marker;
+  }
+
+  public Vertex getVertex() {
+    return connectingVertex;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(marker);
+    if (!marker) {
+      connectingVertex.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    connectingVertex = null;
+    marker = in.readBoolean();
+    if (!marker) {
+      connectingVertex = new Vertex();
+      connectingVertex.readFields(in);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof VertexOrMarker) {
+      VertexOrMarker other = (VertexOrMarker) o;
+      if (marker && other.marker) {
+        return true;
+      } else if (marker || other.marker) {
+        return false;
+      } else {
+        return connectingVertex.equals(other.connectingVertex);
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return 31 * (marker ? 1 : 0) + (connectingVertex != null ? connectingVertex.hashCode() : 0);
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/package-info.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/package-info.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/package-info.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/package-info.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,2 @@
+/** Triangle enumeration in a two-step MapReduce pipeline*/
+package org.apache.mahout.graph.triangles;

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/impl/TasteTestCase.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/impl/TasteTestCase.java?rev=1137447&r1=1137446&r2=1137447&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/impl/TasteTestCase.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/impl/TasteTestCase.java Sun Jun 19 21:19:24 2011
@@ -74,14 +74,4 @@ public abstract class TasteTestCase exte
     return false;
   }
 
-  protected static void writeLines(File file, String... lines) throws FileNotFoundException {
-    PrintWriter writer = new PrintWriter(new OutputStreamWriter(new FileOutputStream(file), Charsets.UTF_8));
-    try {
-      for (String line : lines) {
-        writer.println(line);
-      }
-    } finally {
-      Closeables.closeQuietly(writer);
-    }
-  }
 }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java?rev=1137447&r1=1137446&r2=1137447&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java Sun Jun 19 21:19:24 2011
@@ -17,9 +17,11 @@
 
 package org.apache.mahout.common;
 
-import java.io.IOException;
+import java.io.*;
 import java.lang.reflect.Field;
 
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -121,4 +123,15 @@ public abstract class MahoutTestCase ext
   protected static String optKey(String optionName) {
     return AbstractJob.keyFor(optionName);
   }
+
+  protected static void writeLines(File file, String... lines) throws FileNotFoundException {
+    PrintWriter writer = new PrintWriter(new OutputStreamWriter(new FileOutputStream(file), Charsets.UTF_8));
+    try {
+      for (String line : lines) {
+        writer.println(line);
+      }
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+  }
 }

Added: mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJobTest.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJobTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJobTest.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.apache.mahout.graph.model.UndirectedEdgeWithDegrees;
+import org.apache.mahout.graph.model.Vertex;
+import org.apache.mahout.graph.model.VertexWithDegree;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+public class AugmentGraphWithDegreesJobTest extends MahoutTestCase {
+
+  @Test
+  public void testScatterEdges() throws Exception {
+    Mapper.Context ctx = EasyMock.createMock(Mapper.Context.class);
+
+    ctx.write(new Vertex(123), new Vertex(456));
+    ctx.write(new Vertex(456), new Vertex(123));
+
+    EasyMock.replay(ctx);
+
+    new AugmentGraphWithDegreesJob.ScatterEdgesMapper()
+        .map(new UndirectedEdge(new Vertex(123), new Vertex(456)), null, ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void testSumDegrees() throws Exception {
+    Reducer.Context ctx = EasyMock.createMock(Reducer.Context.class);
+
+    Vertex vertex = new Vertex(1);
+
+    ctx.write(new UndirectedEdge(vertex, new Vertex(3)), new VertexWithDegree(vertex, 3));
+    ctx.write(new UndirectedEdge(vertex, new Vertex(5)), new VertexWithDegree(vertex, 3));
+    ctx.write(new UndirectedEdge(vertex, new Vertex(7)), new VertexWithDegree(vertex, 3));
+
+    EasyMock.replay(ctx);
+
+    new AugmentGraphWithDegreesJob.SumDegreesReducer()
+        .reduce(vertex, Arrays.asList(new Vertex(3), new Vertex(5), new Vertex(7)), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void testJoinDegrees() throws Exception {
+    Reducer.Context ctx = EasyMock.createMock(Reducer.Context.class);
+    Vertex first = new Vertex(1);
+    Vertex second = new Vertex(3);
+
+    ctx.write(new UndirectedEdgeWithDegrees(new VertexWithDegree(first, 1), new VertexWithDegree(second, 3)),
+        NullWritable.get());
+
+    EasyMock.replay(ctx);
+
+    new AugmentGraphWithDegreesJob.JoinDegreesReducer().reduce(new UndirectedEdge(first, second),
+        Arrays.asList(new VertexWithDegree(first, 1), new VertexWithDegree(second, 3)), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void toyIntegrationTest() throws Exception {
+    File inputFile = getTestTempFile("edges.seq");
+    File outputDir = getTestTempDir("output");
+    outputDir.delete();
+    File tempDir = getTestTempDir("tmp");
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(inputFile.getAbsolutePath().toString()),
+        UndirectedEdge.class, NullWritable.class);
+
+    try {
+      writer.append(new UndirectedEdge(0, 1), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 2), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 3), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 4), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 5), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 6), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 7), NullWritable.get());
+      writer.append(new UndirectedEdge(1, 2), NullWritable.get());
+      writer.append(new UndirectedEdge(1, 3), NullWritable.get());
+      writer.append(new UndirectedEdge(2, 3), NullWritable.get());
+      writer.append(new UndirectedEdge(4, 5), NullWritable.get());
+      writer.append(new UndirectedEdge(4, 7), NullWritable.get());
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+
+    AugmentGraphWithDegreesJob augmentGraphWithDegreesJob = new AugmentGraphWithDegreesJob();
+    augmentGraphWithDegreesJob.setConf(conf);
+    augmentGraphWithDegreesJob.run(new String[] { "--input", inputFile.getAbsolutePath(),
+        "--output", outputDir.getAbsolutePath(), "--tempDir", tempDir.getAbsolutePath() });
+
+    Set<UndirectedEdgeWithDegrees> edges = Sets.newHashSet();
+    for (Pair<UndirectedEdgeWithDegrees,NullWritable> result :
+        new SequenceFileIterable<UndirectedEdgeWithDegrees, NullWritable>(new Path(outputDir.getAbsolutePath() +
+        "/part-r-00000"), false, conf)) {
+      edges.add(result.getFirst());
+    }
+
+    assertEquals(12, edges.size());
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(0, 7, 1, 3)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(0, 7, 2, 3)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(0, 7, 3, 3)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(0, 7, 4, 3)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(0, 7, 5, 2)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(0, 7, 6, 1)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(0, 7, 7, 2)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(1, 3, 2, 3)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(1, 3, 3, 3)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(2, 3, 3, 3)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(4, 3, 5, 2)));
+    assertTrue(edges.contains(new UndirectedEdgeWithDegrees(4, 3, 7, 2)));
+  }
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/SimplifyGraphJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/SimplifyGraphJobTest.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/SimplifyGraphJobTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/SimplifyGraphJobTest.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.apache.mahout.graph.model.Vertex;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+public class SimplifyGraphJobTest extends MahoutTestCase {
+
+  @Test
+  public void testEdgeMapping() throws Exception {
+    Mapper.Context ctx = EasyMock.createMock(Mapper.Context.class);
+
+    ctx.write(new UndirectedEdge(new Vertex(123L), new Vertex(456L)), NullWritable.get());
+
+    EasyMock.replay(ctx);
+
+    new SimplifyGraphJob.SimplifyGraphMapper().map(null, new Text("123,456"), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void testLoopRemoval() throws Exception {
+    Mapper.Context ctx = EasyMock.createMock(Mapper.Context.class);
+
+    EasyMock.replay(ctx);
+
+    new SimplifyGraphJob.SimplifyGraphMapper().map(null, new Text("123,123"), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void testIgnoreUnparseableLines() throws Exception {
+    Mapper.Context ctx = EasyMock.createMock(Mapper.Context.class);
+
+    EasyMock.replay(ctx);
+
+    new SimplifyGraphJob.SimplifyGraphMapper().map(null, new Text("abc,123"), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void testAggregation() throws Exception {
+    Reducer.Context ctx = EasyMock.createMock(Reducer.Context.class);
+    UndirectedEdge edge = new UndirectedEdge(new Vertex(123L), new Vertex(456L));
+
+    ctx.write(edge, NullWritable.get());
+
+    EasyMock.replay(ctx);
+
+    new SimplifyGraphJob.SimplifyGraphReducer().reduce(edge, Arrays.asList(NullWritable.get(), NullWritable.get()), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void toyIntegrationTest() throws Exception {
+    File inputFile = getTestTempFile("graph.txt");
+    File outputDir = getTestTempDir("output");
+    outputDir.delete();
+    File tempDir = getTestTempDir("tmp");
+
+    writeLines(inputFile,
+        "0,0",
+        "0,1",
+        "1,0",
+        "1,0",
+        "2,3",
+        "4,3",
+        "4,2");
+
+    Configuration conf = new Configuration();
+    SimplifyGraphJob simplifyGraphJob = new SimplifyGraphJob();
+    simplifyGraphJob.setConf(conf);
+    simplifyGraphJob.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+        "--tempDir", tempDir.getAbsolutePath() });
+
+    Set<UndirectedEdge> edges = Sets.newHashSet();
+    for (Pair<UndirectedEdge,NullWritable> result :
+        new SequenceFileIterable<UndirectedEdge, NullWritable>(new Path(outputDir.getAbsolutePath() + "/part-r-00000"),
+        false, conf)) {
+      edges.add(result.getFirst());
+    }
+
+    assertEquals(4, edges.size());
+    assertTrue(edges.contains(new UndirectedEdge(1, 0)));
+    assertTrue(edges.contains(new UndirectedEdge(2, 3)));
+    assertTrue(edges.contains(new UndirectedEdge(2, 4)));
+    assertTrue(edges.contains(new UndirectedEdge(3, 4)));
+  }
+
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJobTest.java?rev=1137447&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJobTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJobTest.java Sun Jun 19 21:19:24 2011
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.triangles;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.graph.model.Triangle;
+import org.apache.mahout.graph.model.UndirectedEdgeWithDegrees;
+import org.apache.mahout.graph.model.Vertex;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+public class EnumerateTrianglesJobTest extends MahoutTestCase {
+
+  @Test
+  public void testScatterEdgesToLowerVertexDegree() throws Exception {
+    Mapper.Context ctx = EasyMock.createMock(Mapper.Context.class);
+
+    ctx.write(new Vertex(1), new Vertex(3));
+
+    EasyMock.replay(ctx);
+
+    new EnumerateTrianglesJob.ScatterEdgesToLowerDegreeVertexMapper()
+        .map(new UndirectedEdgeWithDegrees(1, 5, 3, 7), null, ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void testBuildOpenTriads() throws Exception {
+    Reducer.Context ctx = EasyMock.createMock(Reducer.Context.class);
+
+    ctx.write(new JoinableUndirectedEdge(1, 2, false), new VertexOrMarker(0));
+    ctx.write(new JoinableUndirectedEdge(1, 3, false), new VertexOrMarker(0));
+    ctx.write(new JoinableUndirectedEdge(2, 3, false), new VertexOrMarker(0));
+
+    EasyMock.replay(ctx);
+
+    new EnumerateTrianglesJob.BuildOpenTriadsReducer().reduce(new Vertex(0), Arrays.asList(new Vertex(1), new Vertex(2),
+        new Vertex(3)), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void testJoinTriangles() throws Exception {
+    Reducer.Context ctx = EasyMock.createMock(Reducer.Context.class);
+
+    ctx.write(new Triangle(0, 1, 2), NullWritable.get());
+    ctx.write(new Triangle(0, 2, 3), NullWritable.get());
+
+    EasyMock.replay(ctx);
+
+    new EnumerateTrianglesJob.JoinTrianglesReducer().reduce(new JoinableUndirectedEdge(0, 2, true),
+        Arrays.asList(VertexOrMarker.MARKER, new VertexOrMarker(1), new VertexOrMarker(3)), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void testJoinTrianglesNoTriangleIfMarkerIsMissing() throws Exception {
+    Reducer.Context ctx = EasyMock.createMock(Reducer.Context.class);
+
+    EasyMock.replay(ctx);
+
+    new EnumerateTrianglesJob.JoinTrianglesReducer().reduce(new JoinableUndirectedEdge(0, 2, false),
+        Arrays.asList(new VertexOrMarker(1), new VertexOrMarker(3)), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+  @Test
+  public void toyIntegrationTest() throws Exception {
+    File inputFile = getTestTempFile("edgesWithDegrees.seq");
+    File outputDir = getTestTempDir("output");
+    outputDir.delete();
+    File tempDir = getTestTempDir("tmp");
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(inputFile.getAbsolutePath().toString()),
+        UndirectedEdgeWithDegrees.class, NullWritable.class);
+
+    try {
+      writer.append(new UndirectedEdgeWithDegrees(0, 7, 1, 3), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(0, 7, 2, 3), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(0, 7, 3, 3), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(0, 7, 4, 3), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(0, 7, 5, 2), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(0, 7, 6, 1), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(0, 7, 7, 2), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(1, 3, 2, 3), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(1, 3, 3, 3), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(2, 3, 3, 3), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(4, 3, 5, 2), NullWritable.get());
+      writer.append(new UndirectedEdgeWithDegrees(4, 3, 7, 2), NullWritable.get());
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+
+    EnumerateTrianglesJob enumerateTrianglesJob = new EnumerateTrianglesJob();
+    enumerateTrianglesJob.setConf(conf);
+    enumerateTrianglesJob.run(new String[] { "--input", inputFile.getAbsolutePath(),
+        "--output", outputDir.getAbsolutePath(), "--tempDir", tempDir.getAbsolutePath() });
+
+    Set<Triangle> triangles = Sets.newHashSet();
+    for (Pair<Triangle,NullWritable> result :
+        new SequenceFileIterable<Triangle, NullWritable>(new Path(outputDir.getAbsolutePath() + "/part-r-00000"),
+        false, conf)) {
+      triangles.add(result.getFirst());
+    }
+
+    assertEquals(6, triangles.size());
+    assertTrue(triangles.contains(new Triangle(0, 1, 2)));
+    assertTrue(triangles.contains(new Triangle(0, 1, 3)));
+    assertTrue(triangles.contains(new Triangle(0, 2, 3)));
+    assertTrue(triangles.contains(new Triangle(0, 4, 5)));
+    assertTrue(triangles.contains(new Triangle(0, 4, 7)));
+    assertTrue(triangles.contains(new Triangle(1, 2, 3)));
+  }
+}



Mime
View raw message