mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1138329 - in /mahout/trunk/core/src: main/java/org/apache/mahout/common/mapreduce/SumReducer.java main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
Date Wed, 22 Jun 2011 07:58:48 GMT
Author: ssc
Date: Wed Jun 22 07:58:47 2011
New Revision: 1138329

URL: http://svn.apache.org/viewvc?rev=1138329&view=rev
Log:
MAHOUT-739 MapReduce job to compute the degree distribution of an undirected graph

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java

Added: mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java?rev=1138329&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java Wed
Jun 22 07:58:47 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+public class SumReducer extends Reducer<WritableComparable<?>,IntWritable,WritableComparable<?>,IntWritable>
{
+    @Override
+    protected void reduce(WritableComparable<?> key, Iterable<IntWritable> counts,
Context ctx)
+        throws IOException, InterruptedException {
+      int sum = 0;
+      for (IntWritable count : counts) {
+        sum += count.get();
+      }
+      ctx.write(key, new IntWritable(sum));
+    }
+  }

Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java?rev=1138329&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
(added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
Wed Jun 22 07:58:47 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.mapreduce.SumReducer;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.apache.mahout.graph.model.Vertex;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * <p>Distributed computation of the distribution of degrees of an undirected graph</p>
+ *
+ * <p>The input file needs to be a {@link org.apache.hadoop.io.SequenceFile} with {@link
UndirectedEdge}s as keys and
+ * any Writable as values, as it is already produced by {@link SimplifyGraphJob}</p>
+ *
+ * <p>This job outputs text files with a degree and the number of nodes having that
degree per line.</p>
+ *
+ *
+ * <p>Command line arguments specific to this class are:</p>
+ *
+ * <ol>
+ * <li>-Dmapred.input.dir=(path): Directory containing one or more sequence files with
edge data</li>
+ * <li>-Dmapred.output.dir=(path): output path where the degree distribution data should
be written</li>
+ * </ol>
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear
before all other arguments.</p>
+ */
+public class DegreeDistributionJob extends AbstractJob {
+
+  private static final IntWritable ONE = new IntWritable(1);
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new DegreeDistributionJob(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+
+    Map<String, String> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
+    Path degreesPerVertexPath = new Path(tempDirPath, "degreesPerVertex");
+
+    Job degreesPerVertex = prepareJob(getInputPath(), degreesPerVertexPath, SequenceFileInputFormat.class,
+        DegreeOfVertexMapper.class, Vertex.class, IntWritable.class, SumReducer.class, Vertex.class,
IntWritable.class,
+        SequenceFileOutputFormat.class);
+    degreesPerVertex.setCombinerClass(SumReducer.class);
+    degreesPerVertex.waitForCompletion(true);
+
+    Job degreeDistribution = prepareJob(degreesPerVertexPath, getOutputPath(), SequenceFileInputFormat.class,
+        DegreesMapper.class, IntWritable.class, IntWritable.class, SumReducer.class, IntWritable.class,
+        IntWritable.class, TextOutputFormat.class);
+    degreeDistribution.setCombinerClass(SumReducer.class);
+    degreeDistribution.waitForCompletion(true);
+
+    return 0;
+  }
+
+  public static class DegreeOfVertexMapper extends Mapper<UndirectedEdge,Writable,Vertex,IntWritable>
{
+    @Override
+    protected void map(UndirectedEdge edge, Writable value, Context ctx) throws IOException,
InterruptedException {
+      ctx.write(edge.getFirstVertex(), ONE);
+      ctx.write(edge.getSecondVertex(), ONE);
+    }
+  }
+
+  public static class DegreesMapper extends Mapper<Vertex,IntWritable,IntWritable,IntWritable>
{
+    @Override
+    protected void map(Vertex vertex, IntWritable degree, Context ctx) throws IOException,
InterruptedException {
+      ctx.write(degree, ONE);
+    }
+  }
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java?rev=1138329&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
(added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
Wed Jun 22 07:58:47 2011
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Map;
+
+public class DegreeDistributionJobTest extends MahoutTestCase {
+
+@Test
+  public void toyIntegrationTest() throws Exception {
+
+    File inputFile = getTestTempFile("edges.seq");
+    File outputDir = getTestTempDir("output");
+    outputDir.delete();
+    File tempDir = getTestTempDir("tmp");
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(inputFile.getAbsolutePath().toString()),
+        UndirectedEdge.class, NullWritable.class);
+
+    try {
+      writer.append(new UndirectedEdge(0, 1), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 2), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 3), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 4), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 5), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 6), NullWritable.get());
+      writer.append(new UndirectedEdge(0, 7), NullWritable.get());
+      writer.append(new UndirectedEdge(1, 2), NullWritable.get());
+      writer.append(new UndirectedEdge(1, 3), NullWritable.get());
+      writer.append(new UndirectedEdge(2, 3), NullWritable.get());
+      writer.append(new UndirectedEdge(4, 5), NullWritable.get());
+      writer.append(new UndirectedEdge(4, 7), NullWritable.get());
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+
+    DegreeDistributionJob degreeDistributionJob = new DegreeDistributionJob();
+    degreeDistributionJob.setConf(conf);
+    degreeDistributionJob.run(new String[] { "--input", inputFile.getAbsolutePath(),
+        "--output", outputDir.getAbsolutePath(), "--tempDir", tempDir.getAbsolutePath() });
+
+    Map<Integer,Integer> degreeDistribution = Maps.newHashMap();
+    for (String line : new FileLineIterable(new File(outputDir, "part-r-00000"))) {
+      String[] tokens = Iterables.toArray(Splitter.on("\t").split(line), String.class);
+      degreeDistribution.put(Integer.parseInt(tokens[0]), Integer.parseInt(tokens[1]));
+    }
+
+    assertEquals(4, degreeDistribution.size());
+    assertEquals(1, degreeDistribution.get(1).intValue());
+    assertEquals(2, degreeDistribution.get(2).intValue());
+    assertEquals(4, degreeDistribution.get(3).intValue());
+    assertEquals(1, degreeDistribution.get(7).intValue());
+  }
+
+}



Mime
View raw message