Return-Path: X-Original-To: apmail-mahout-commits-archive@www.apache.org Delivered-To: apmail-mahout-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A059F6AE7 for ; Wed, 22 Jun 2011 07:59:15 +0000 (UTC) Received: (qmail 69719 invoked by uid 500); 22 Jun 2011 07:59:15 -0000 Delivered-To: apmail-mahout-commits-archive@mahout.apache.org Received: (qmail 69200 invoked by uid 500); 22 Jun 2011 07:59:13 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 69191 invoked by uid 99); 22 Jun 2011 07:59:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jun 2011 07:59:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jun 2011 07:59:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4DE4423889D5; Wed, 22 Jun 2011 07:58:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1138329 - in /mahout/trunk/core/src: main/java/org/apache/mahout/common/mapreduce/SumReducer.java main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java Date: Wed, 22 Jun 2011 07:58:48 -0000 To: commits@mahout.apache.org From: ssc@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110622075848.4DE4423889D5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ssc Date: Wed Jun 22 07:58:47 2011 New Revision: 1138329 URL: http://svn.apache.org/viewvc?rev=1138329&view=rev Log: MAHOUT-739 MapReduce job to compute the degree distribution of an undirected graph Added: mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java Added: mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java?rev=1138329&view=auto ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java (added) +++ mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java Wed Jun 22 07:58:47 2011 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.mapreduce; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class SumReducer extends Reducer,IntWritable,WritableComparable,IntWritable> { + @Override + protected void reduce(WritableComparable key, Iterable counts, Context ctx) + throws IOException, InterruptedException { + int sum = 0; + for (IntWritable count : counts) { + sum += count.get(); + } + ctx.write(key, new IntWritable(sum)); + } + } Added: mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java?rev=1138329&view=auto ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java (added) +++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java Wed Jun 22 07:58:47 2011 @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.graph.common; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.mapreduce.SumReducer; +import org.apache.mahout.graph.model.UndirectedEdge; +import org.apache.mahout.graph.model.Vertex; + +import java.io.IOException; +import java.util.Map; + +/** + *

Distributed computation of the distribution of degrees of an undirected graph

+ * + *

The input file needs to be a {@link org.apache.hadoop.io.SequenceFile} with {@link UndirectedEdge}s as keys and + * any Writable as values, as it is already produced by {@link SimplifyGraphJob}

+ * + *

This job outputs text files with a degree and the number of nodes having that degree per line.

+ * + * + *

Command line arguments specific to this class are:

+ * + *
    + *
  1. -Dmapred.input.dir=(path): Directory containing one or more sequence files with edge data
  2. + *
  3. -Dmapred.output.dir=(path): output path where the degree distribution data should be written
  4. + *
+ * + *

General command line options are documented in {@link AbstractJob}.

+ * + *

Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other arguments.

+ */ +public class DegreeDistributionJob extends AbstractJob { + + private static final IntWritable ONE = new IntWritable(1); + + public static void main(String[] args) throws Exception { + ToolRunner.run(new DegreeDistributionJob(), args); + } + + @Override + public int run(String[] args) throws Exception { + addInputOption(); + addOutputOption(); + + Map parsedArgs = parseArguments(args); + if (parsedArgs == null) { + return -1; + } + + Path tempDirPath = new Path(parsedArgs.get("--tempDir")); + Path degreesPerVertexPath = new Path(tempDirPath, "degreesPerVertex"); + + Job degreesPerVertex = prepareJob(getInputPath(), degreesPerVertexPath, SequenceFileInputFormat.class, + DegreeOfVertexMapper.class, Vertex.class, IntWritable.class, SumReducer.class, Vertex.class, IntWritable.class, + SequenceFileOutputFormat.class); + degreesPerVertex.setCombinerClass(SumReducer.class); + degreesPerVertex.waitForCompletion(true); + + Job degreeDistribution = prepareJob(degreesPerVertexPath, getOutputPath(), SequenceFileInputFormat.class, + DegreesMapper.class, IntWritable.class, IntWritable.class, SumReducer.class, IntWritable.class, + IntWritable.class, TextOutputFormat.class); + degreeDistribution.setCombinerClass(SumReducer.class); + degreeDistribution.waitForCompletion(true); + + return 0; + } + + public static class DegreeOfVertexMapper extends Mapper { + @Override + protected void map(UndirectedEdge edge, Writable value, Context ctx) throws IOException, InterruptedException { + ctx.write(edge.getFirstVertex(), ONE); + ctx.write(edge.getSecondVertex(), ONE); + } + } + + public static class DegreesMapper extends Mapper { + @Override + protected void map(Vertex vertex, IntWritable degree, Context ctx) throws IOException, InterruptedException { + ctx.write(degree, ONE); + } + } +} Added: mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java?rev=1138329&view=auto ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java (added) +++ mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java Wed Jun 22 07:58:47 2011 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.graph.common; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.iterator.FileLineIterable; +import org.apache.mahout.graph.model.UndirectedEdge; +import org.junit.Test; + +import java.io.File; +import java.util.Map; + +public class DegreeDistributionJobTest extends MahoutTestCase { + +@Test + public void toyIntegrationTest() throws Exception { + + File inputFile = getTestTempFile("edges.seq"); + File outputDir = getTestTempDir("output"); + outputDir.delete(); + File tempDir = getTestTempDir("tmp"); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(inputFile.getAbsolutePath().toString()), + UndirectedEdge.class, NullWritable.class); + + try { + writer.append(new UndirectedEdge(0, 1), NullWritable.get()); + writer.append(new UndirectedEdge(0, 2), NullWritable.get()); + writer.append(new UndirectedEdge(0, 3), NullWritable.get()); + writer.append(new UndirectedEdge(0, 4), NullWritable.get()); + writer.append(new UndirectedEdge(0, 5), NullWritable.get()); + writer.append(new UndirectedEdge(0, 6), NullWritable.get()); + writer.append(new UndirectedEdge(0, 7), NullWritable.get()); + writer.append(new UndirectedEdge(1, 2), NullWritable.get()); + writer.append(new UndirectedEdge(1, 3), NullWritable.get()); + writer.append(new UndirectedEdge(2, 3), NullWritable.get()); + writer.append(new UndirectedEdge(4, 5), NullWritable.get()); + writer.append(new UndirectedEdge(4, 7), NullWritable.get()); + } finally { + Closeables.closeQuietly(writer); + } + + DegreeDistributionJob degreeDistributionJob = new DegreeDistributionJob(); + degreeDistributionJob.setConf(conf); + degreeDistributionJob.run(new String[] { "--input", inputFile.getAbsolutePath(), + "--output", outputDir.getAbsolutePath(), "--tempDir", tempDir.getAbsolutePath() }); + + Map degreeDistribution = Maps.newHashMap(); + for (String line : new FileLineIterable(new File(outputDir, "part-r-00000"))) { + String[] tokens = Iterables.toArray(Splitter.on("\t").split(line), String.class); + degreeDistribution.put(Integer.parseInt(tokens[0]), Integer.parseInt(tokens[1])); + } + + assertEquals(4, degreeDistribution.size()); + assertEquals(1, degreeDistribution.get(1).intValue()); + assertEquals(2, degreeDistribution.get(2).intValue()); + assertEquals(4, degreeDistribution.get(3).intValue()); + assertEquals(1, degreeDistribution.get(7).intValue()); + } + +}