mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1142668 - in /mahout/trunk/core/src/main/java/org/apache/mahout: common/ graph/common/ graph/triangles/
Date Mon, 04 Jul 2011 13:59:13 GMT
Author: ssc
Date: Mon Jul  4 13:59:13 2011
New Revision: 1142668

URL: http://svn.apache.org/viewvc?rev=1142668&view=rev
Log:
small cleanup of tmp directory management in the graph jobs

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Mon Jul  4 13:59:13
2011
@@ -131,6 +131,10 @@ public abstract class AbstractJob extend
     return new Path(tempPath, directory);
   }
 
+  protected Path getCombinedTempPath(String directory1, String directory2) {
+    return new Path(new Path(tempPath, directory1) + "," + new Path(tempPath, directory2));
+  }
+
   /** Add an option with no argument whose presence can be checked for using
    *  {@code containsKey} method on the map returned by {@link #parseArguments(String[])};
    */

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
Mon Jul  4 13:59:13 2011
@@ -43,6 +43,8 @@ import org.apache.mahout.graph.model.Ver
  */
 public class AugmentGraphWithDegreesJob extends AbstractJob {
 
+  public static final String TMP_AUGMENTED_EDGES = "augmented-edges";
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new AugmentGraphWithDegreesJob(), args);
   }
@@ -58,22 +60,19 @@ public class AugmentGraphWithDegreesJob 
       return -1;
     }
 
-    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-
     Path inputPath = getInputPath();
-    Path augmentedEdgesPath = new Path(tempDirPath, "augmented-edges");
     Path outputPath = getOutputPath();
 
     // scatter the edges to each of the vertices and count degree
-    Job scatter = prepareJob(inputPath, augmentedEdgesPath, SequenceFileInputFormat.class,
ScatterEdgesMapper.class,
-        Vertex.class, Vertex.class, SumDegreesReducer.class, UndirectedEdge.class, VertexWithDegree.class,
-        SequenceFileOutputFormat.class);
+    Job scatter = prepareJob(inputPath, getTempPath(TMP_AUGMENTED_EDGES), SequenceFileInputFormat.class,
+        ScatterEdgesMapper.class, Vertex.class, Vertex.class, SumDegreesReducer.class, UndirectedEdge.class,
+        VertexWithDegree.class, SequenceFileOutputFormat.class);
     scatter.waitForCompletion(true);
 
     // join augmented edges with partial degree information to to complete records
-    Job join = prepareJob(augmentedEdgesPath, outputPath, SequenceFileInputFormat.class,
Mapper.class,
-        UndirectedEdge.class, VertexWithDegree.class, JoinDegreesReducer.class, UndirectedEdgeWithDegrees.class,
-        NullWritable.class, SequenceFileOutputFormat.class);
+    Job join = prepareJob(getTempPath(TMP_AUGMENTED_EDGES), outputPath, SequenceFileInputFormat.class,
+        Mapper.class, UndirectedEdge.class, VertexWithDegree.class, JoinDegreesReducer.class,
+        UndirectedEdgeWithDegrees.class, NullWritable.class, SequenceFileOutputFormat.class);
     join.waitForCompletion(true);
 
     return 0;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
Mon Jul  4 13:59:13 2011
@@ -17,7 +17,6 @@
 
 package org.apache.mahout.graph.common;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
@@ -56,6 +55,8 @@ import java.util.Map;
  */
 public class DegreeDistributionJob extends AbstractJob {
 
+  public static final String TMP_DEGREES_PER_VERTEX = "degreesPerVertex";
+
   private static final IntWritable ONE = new IntWritable(1);
 
   public static void main(String[] args) throws Exception {
@@ -72,18 +73,15 @@ public class DegreeDistributionJob exten
       return -1;
     }
 
-    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-    Path degreesPerVertexPath = new Path(tempDirPath, "degreesPerVertex");
-
-    Job degreesPerVertex = prepareJob(getInputPath(), degreesPerVertexPath, SequenceFileInputFormat.class,
-        DegreeOfVertexMapper.class, Vertex.class, IntWritable.class, IntSumReducer.class,
Vertex.class,
-        IntWritable.class, SequenceFileOutputFormat.class);
+    Job degreesPerVertex = prepareJob(getInputPath(), getTempPath(TMP_DEGREES_PER_VERTEX),
+        SequenceFileInputFormat.class, DegreeOfVertexMapper.class, Vertex.class, IntWritable.class,
IntSumReducer.class,
+        Vertex.class, IntWritable.class, SequenceFileOutputFormat.class);
     degreesPerVertex.setCombinerClass(IntSumReducer.class);
     degreesPerVertex.waitForCompletion(true);
 
-    Job degreeDistribution = prepareJob(degreesPerVertexPath, getOutputPath(), SequenceFileInputFormat.class,
-        DegreesMapper.class, IntWritable.class, IntWritable.class, IntSumReducer.class, IntWritable.class,
-        IntWritable.class, TextOutputFormat.class);
+    Job degreeDistribution = prepareJob(getTempPath(TMP_DEGREES_PER_VERTEX), getOutputPath(),
+        SequenceFileInputFormat.class, DegreesMapper.class, IntWritable.class, IntWritable.class,
IntSumReducer.class,
+        IntWritable.class, IntWritable.class, TextOutputFormat.class);
     degreeDistribution.setCombinerClass(IntSumReducer.class);
     degreeDistribution.waitForCompletion(true);
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
Mon Jul  4 13:59:13 2011
@@ -64,6 +64,9 @@ import java.util.Map;
  */
 public class LocalClusteringCoefficientJob extends AbstractJob {
 
+  public static final String TMP_EDGES_PER_VERTEX = "edgesPerVertex";
+  public static final String TMP_TRIANGLES_PER_VERTEX = "trianglesPerVertex";
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new LocalClusteringCoefficientJob(), args);
   }
@@ -83,27 +86,23 @@ public class LocalClusteringCoefficientJ
     Path edgesPath = new Path(parsedArgs.get("--edges"));
     Path trianglesPath = new Path(parsedArgs.get("--triangles"));
 
-    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-
-    Path edgesPerVertex = new Path(tempDirPath, "edgesPerVertex");
-    Path trianglesPerVertex = new Path(tempDirPath, "trianglesPerVertex");
-
     // unfortunately we don't have access to an undeprecated MultipleInputs, so we need several
M/R steps instead of one...
-    Job countEdgesPerVertex = prepareJob(edgesPath, edgesPerVertex, SequenceFileInputFormat.class,
-        EdgeCountMapper.class, Vertex.class, TriangleOrEdgeCount.class, Reducer.class, Vertex.class,
-        TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
+    Job countEdgesPerVertex = prepareJob(edgesPath, getTempPath(TMP_EDGES_PER_VERTEX),
+        SequenceFileInputFormat.class, EdgeCountMapper.class, Vertex.class, TriangleOrEdgeCount.class,
Reducer.class,
+        Vertex.class, TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
     countEdgesPerVertex.setCombinerClass(TriangleOrEdgeCountCombiner.class);
     countEdgesPerVertex.waitForCompletion(true);
 
-    Job countTrianglesPerVertex = prepareJob(trianglesPath, trianglesPerVertex, SequenceFileInputFormat.class,
-        TriangleCountMapper.class, Vertex.class, TriangleOrEdgeCount.class, Reducer.class,
Vertex.class,
-        TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
+    Job countTrianglesPerVertex = prepareJob(trianglesPath, getTempPath(TMP_TRIANGLES_PER_VERTEX),
+        SequenceFileInputFormat.class, TriangleCountMapper.class, Vertex.class, TriangleOrEdgeCount.class,
+        Reducer.class, Vertex.class, TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
     countTrianglesPerVertex.setCombinerClass(TriangleOrEdgeCountCombiner.class);
     countTrianglesPerVertex.waitForCompletion(true);
 
-    Job computeLocalClusteringCoefficient = prepareJob(new Path(edgesPerVertex + "," + trianglesPerVertex),
-        getOutputPath(), SequenceFileInputFormat.class, Mapper.class, Vertex.class, TriangleOrEdgeCount.class,
-        LocalClusteringCoefficientReducer.class, LongWritable.class, DoubleWritable.class,
TextOutputFormat.class);
+    Job computeLocalClusteringCoefficient = prepareJob(getCombinedTempPath(TMP_EDGES_PER_VERTEX,
+        TMP_TRIANGLES_PER_VERTEX), getOutputPath(), SequenceFileInputFormat.class, Mapper.class,
+        Vertex.class, TriangleOrEdgeCount.class, LocalClusteringCoefficientReducer.class,
LongWritable.class,
+        DoubleWritable.class, TextOutputFormat.class);
     computeLocalClusteringCoefficient.setCombinerClass(TriangleOrEdgeCountCombiner.class);
     computeLocalClusteringCoefficient.waitForCompletion(true);
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java Mon
Jul  4 13:59:13 2011
@@ -20,7 +20,6 @@ package org.apache.mahout.graph.common;
 import java.io.IOException;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -64,10 +63,7 @@ public class SimplifyGraphJob extends Ab
       return -1;
     }
 
-    Path inputPath = getInputPath();
-    Path outputPath = getOutputPath();
-
-    Job simplify = prepareJob(inputPath, outputPath, TextInputFormat.class, SimplifyGraphMapper.class,
+    Job simplify = prepareJob(getInputPath(), getOutputPath(), TextInputFormat.class, SimplifyGraphMapper.class,
         UndirectedEdge.class, NullWritable.class, SimplifyGraphReducer.class, UndirectedEdge.class,
NullWritable.class,
         SequenceFileOutputFormat.class);
     simplify.waitForCompletion(true);
@@ -79,6 +75,7 @@ public class SimplifyGraphJob extends Ab
   public static class SimplifyGraphMapper extends Mapper<Object, Text, UndirectedEdge,
NullWritable> {
 
     private static final Pattern SEPARATOR = Pattern.compile(",");
+
     @Override
     public void map(Object key, Text line, Context ctx) throws IOException, InterruptedException
{
       try {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
Mon Jul  4 13:59:13 2011
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -41,6 +40,9 @@ import org.apache.mahout.graph.model.Ver
 /** Enumerates all triangles of an undirected graph. */
 public class EnumerateTrianglesJob extends AbstractJob {
 
+  public static final String TMP_CLOSING_EDGES = "closingEdges";
+  public static final String TMP_OPEN_TRIADS = "openTriads";
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new EnumerateTrianglesJob(), args);
   }
@@ -55,29 +57,22 @@ public class EnumerateTrianglesJob exten
       return -1;
     }
 
-    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-
-    Path inputPath = getInputPath();
-    Path joinableInputPath = new Path(tempDirPath, "joinableInput");
-    Path triadsPath = new Path(tempDirPath, "triangles");
-    Path outputPath = getOutputPath();
-
     // scatter the edges to lower degree vertex and build open triads
-    Job scatter = prepareJob(inputPath, triadsPath, SequenceFileInputFormat.class,
+    Job scatter = prepareJob(getInputPath(), getTempPath(TMP_OPEN_TRIADS), SequenceFileInputFormat.class,
         ScatterEdgesToLowerDegreeVertexMapper.class, Vertex.class, Vertex.class,
         BuildOpenTriadsReducer.class, JoinableUndirectedEdge.class, VertexOrMarker.class,
         SequenceFileOutputFormat.class);
     scatter.waitForCompletion(true);
 
     // necessary as long as we don't have access to an undeprecated MultipleInputs
-    Job prepareInput = prepareJob(inputPath, joinableInputPath, SequenceFileInputFormat.class,
PrepareInputMapper.class,
-        JoinableUndirectedEdge.class, VertexOrMarker.class, Reducer.class, JoinableUndirectedEdge.class,
-        VertexOrMarker.class, SequenceFileOutputFormat.class);
+    Job prepareInput = prepareJob(getInputPath(), getTempPath(TMP_CLOSING_EDGES), SequenceFileInputFormat.class,
+        PrepareInputMapper.class, JoinableUndirectedEdge.class, VertexOrMarker.class, Reducer.class,
+        JoinableUndirectedEdge.class, VertexOrMarker.class, SequenceFileOutputFormat.class);
     prepareInput.setGroupingComparatorClass(JoinableUndirectedEdge.GroupingComparator.class);
     prepareInput.waitForCompletion(true);
 
     //join opentriads and edges pairwise to get all triangles
-    Job joinTriads = prepareJob(new Path(triadsPath + "," + joinableInputPath), outputPath,
+    Job joinTriads = prepareJob(getCombinedTempPath(TMP_OPEN_TRIADS, TMP_CLOSING_EDGES),
getOutputPath(),
         SequenceFileInputFormat.class, Mapper.class, JoinableUndirectedEdge.class, VertexOrMarker.class,
         JoinTrianglesReducer.class, Triangle.class, NullWritable.class, SequenceFileOutputFormat.class);
     joinTriads.setGroupingComparatorClass(JoinableUndirectedEdge.GroupingComparator.class);



Mime
View raw message