giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apre...@apache.org
Subject git commit: GIRAPH-431: Support edge and vertex value input formats in GiraphRunner (apresta)
Date Thu, 17 Jan 2013 00:05:21 GMT
Updated Branches:
  refs/heads/trunk e1a7f2905 -> 8c42e3f9b


GIRAPH-431: Support edge and vertex value input formats in GiraphRunner (apresta)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8c42e3f9
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8c42e3f9
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8c42e3f9

Branch: refs/heads/trunk
Commit: 8c42e3f9b6844668c029778bcd97603459ec98fc
Parents: e1a7f29
Author: Alessandro Presta <alessandro@fb.com>
Authored: Wed Jan 16 15:43:17 2013 -0800
Committer: Alessandro Presta <alessandro@fb.com>
Committed: Wed Jan 16 16:05:05 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    4 +-
 .../main/java/org/apache/giraph/GiraphRunner.java  |  115 +++++++++------
 .../apache/giraph/graph/GiraphTypeValidator.java   |   85 ++++++++---
 3 files changed, 131 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8c42e3f9/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 6c2c5e6..5fbda9a 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,7 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
-  GIRAPH-Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta via
ereisman)
+  GIRAPH-431: Support edge and vertex value input formats in GiraphRunner (apresta)
+
+  GIRAPH-477: Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta
via ereisman)
 
   GIRAPH-459: Group Vertex Mutations by Partition ID (claudio)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8c42e3f9/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
index 1edb262..3d0e634 100644
--- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
@@ -17,27 +17,30 @@
  */
 package org.apache.giraph;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.examples.Algorithm;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.examples.Algorithm;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.GiraphTypeValidator;
-import org.apache.giraph.master.MasterCompute;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.io.formats.GiraphFileInputFormat;
+import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.utils.AnnotationUtils;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -45,9 +48,6 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.ZooKeeper;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-
 import java.net.URI;
 import java.util.List;
 
@@ -66,13 +66,6 @@ public class GiraphRunner implements Tool {
   private Configuration conf;
 
   /**
-   * Required options.
-   */
-  private final String [][] requiredOptions =
-  {{"w", "Need to choose the number of workers (-w)"},
-   {"if", "Need to set inputformat (-if)"}};
-
-  /**
    * Get the options available.
    *
    * @return Options available.
@@ -84,10 +77,12 @@ public class GiraphRunner implements Tool {
         "algorithms");
     options.addOption("q", "quiet", false, "Quiet output");
     options.addOption("w", "workers", true, "Number of workers");
-    options.addOption("if", "inputFormat", true, "Graph inputformat");
-    options.addOption("of", "outputFormat", true, "Graph outputformat");
-    options.addOption("ip", "inputPath", true, "Graph input path");
-    options.addOption("op", "outputPath", true, "Graph output path");
+    options.addOption("vif", "vertexInputFormat", true, "Vertex input format");
+    options.addOption("eif", "edgeInputFormat", true, "Edge input format");
+    options.addOption("of", "outputFormat", true, "Vertex output format");
+    options.addOption("vip", "vertexInputPath", true, "Vertex input path");
+    options.addOption("eip", "edgeInputPath", true, "Edge input path");
+    options.addOption("op", "outputPath", true, "Vertex output path");
     options.addOption("c", "combiner", true, "Combiner class");
     options.addOption("wc", "workerContext", true, "WorkerContext class");
     options.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class");
@@ -157,47 +152,73 @@ public class GiraphRunner implements Tool {
       LOG.debug("Attempting to run Vertex: " + vertexClassName);
     }
 
-    // Verify all the options have been provided
-    for (String[] requiredOption : requiredOptions) {
-      if (!cmd.hasOption(requiredOption[0])) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info(requiredOption[1]);
-        }
-        return -1;
+    // Verify all the required options have been provided
+    if (!cmd.hasOption("w")) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Need to choose the number of workers (-w)");
+      }
+      return -1;
+    }
+    if (!cmd.hasOption("vif") && !cmd.hasOption("eif")) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Need to set an input format (-vif or -eif)");
       }
+      return -1;
     }
 
     int workers = Integer.parseInt(cmd.getOptionValue('w'));
+
     GiraphConfiguration giraphConfiguration = new GiraphConfiguration(
             getConf());
+
     giraphConfiguration.setVertexClass(
         (Class<? extends Vertex>) Class.forName(vertexClassName));
-    giraphConfiguration.setVertexInputFormatClass(
-        (Class<? extends VertexInputFormat>)
-            Class.forName(cmd.getOptionValue("if")));
-    giraphConfiguration.setVertexOutputFormatClass(
-        (Class<? extends VertexOutputFormat>)
-            Class.forName(cmd.getOptionValue("of")));
+
     GiraphJob job = new GiraphJob(
         giraphConfiguration, "Giraph: " + vertexClassName);
 
-    if (cmd.hasOption("ip")) {
-      FileInputFormat.addInputPath(job.getInternalJob(),
-          new Path(cmd.getOptionValue("ip")));
-    } else {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("No input path specified. Ensure your InputFormat does" +
-            " not require one.");
+    if (cmd.hasOption("vif")) {
+      giraphConfiguration.setVertexInputFormatClass(
+          (Class<? extends VertexInputFormat>)
+              Class.forName(cmd.getOptionValue("vif")));
+      if (cmd.hasOption("vip")) {
+        GiraphFileInputFormat.addVertexInputPath(job.getInternalJob(),
+            new Path(cmd.getOptionValue("eip")));
+      } else {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("No vertex input path specified. Ensure your " +
+              "VertexInputFormat does not require one.");
+        }
       }
     }
 
-    if (cmd.hasOption("op")) {
-      FileOutputFormat.setOutputPath(job.getInternalJob(),
-                                     new Path(cmd.getOptionValue("op")));
-    } else {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("No output path specified. Ensure your OutputFormat does" +
-            " not require one.");
+    if (cmd.hasOption("eif")) {
+      giraphConfiguration.setEdgeInputFormatClass(
+          (Class<? extends EdgeInputFormat>)
+              Class.forName(cmd.getOptionValue("eif")));
+      if (cmd.hasOption("eip")) {
+        GiraphFileInputFormat.addEdgeInputPath(job.getInternalJob(),
+            new Path(cmd.getOptionValue("eip")));
+      } else {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("No edge input path specified. Ensure your " +
+              "VertexInputFormat does not require one.");
+        }
+      }
+    }
+
+    if (cmd.hasOption("of")) {
+      giraphConfiguration.setVertexOutputFormatClass(
+          (Class<? extends VertexOutputFormat>)
+              Class.forName(cmd.getOptionValue("of")));
+      if (cmd.hasOption("op")) {
+        FileOutputFormat.setOutputPath(job.getInternalJob(),
+            new Path(cmd.getOptionValue("op")));
+      } else {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("No output path specified. Ensure your VertexOutputFormat " +
+              "does not require one.");
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8c42e3f9/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
index c179a4b..213a69a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.bsp.BspUtils;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.utils.ReflectionUtils;
@@ -60,6 +61,8 @@ public class GiraphTypeValidator<I extends WritableComparable,
   private static final int MSG_PARAM_INDEX = 3;
   /** M param vertex combiner index in classList */
   private static final int MSG_COMBINER_PARAM_INDEX = 1;
+  /** E param edge input format index in classList */
+  private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
 
   /** Vertex Index Type */
   private Type vertexIndexType;
@@ -101,6 +104,7 @@ public class GiraphTypeValidator<I extends WritableComparable,
     edgeValueType = classList.get(EDGE_PARAM_INDEX);
     messageValueType = classList.get(MSG_PARAM_INDEX);
     verifyVertexInputFormatGenericTypes();
+    verifyEdgeInputFormatGenericTypes();
     verifyVertexOutputFormatGenericTypes();
     verifyVertexResolverGenericTypes();
     verifyVertexCombinerGenericTypes();
@@ -110,32 +114,63 @@ public class GiraphTypeValidator<I extends WritableComparable,
   private void verifyVertexInputFormatGenericTypes() {
     Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
       BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
-    List<Class<?>> classList =
-      ReflectionUtils.getTypeArguments(
-        VertexInputFormat.class, vertexInputFormatClass);
-    if (classList.get(ID_PARAM_INDEX) == null) {
-      LOG.warn("Input format vertex index type is not known");
-    } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-      throw new IllegalArgumentException(
-        "checkClassTypes: Vertex index types don't match, " +
-          "vertex - " + vertexIndexType +
-          ", vertex input format - " + classList.get(ID_PARAM_INDEX));
-    }
-    if (classList.get(VALUE_PARAM_INDEX) == null) {
-      LOG.warn("Input format vertex value type is not known");
-    } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
-      throw new IllegalArgumentException(
-        "checkClassTypes: Vertex value types don't match, " +
-          "vertex - " + vertexValueType +
-          ", vertex input format - " + classList.get(VALUE_PARAM_INDEX));
+    if (vertexInputFormatClass != null) {
+      List<Class<?>> classList =
+          ReflectionUtils.getTypeArguments(
+              VertexInputFormat.class, vertexInputFormatClass);
+      if (classList.get(ID_PARAM_INDEX) == null) {
+        LOG.warn("Input format vertex index type is not known");
+      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Vertex index types don't match, " +
+                "vertex - " + vertexIndexType +
+                ", vertex input format - " + classList.get(ID_PARAM_INDEX));
+      }
+      if (classList.get(VALUE_PARAM_INDEX) == null) {
+        LOG.warn("Input format vertex value type is not known");
+      } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Vertex value types don't match, " +
+                "vertex - " + vertexValueType +
+                ", vertex input format - " + classList.get(VALUE_PARAM_INDEX));
+      }
+      if (classList.get(EDGE_PARAM_INDEX) == null) {
+        LOG.warn("Input format edge value type is not known");
+      } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Edge value types don't match, " +
+                "vertex - " + edgeValueType +
+                ", vertex input format - " + classList.get(EDGE_PARAM_INDEX));
+      }
     }
-    if (classList.get(EDGE_PARAM_INDEX) == null) {
-      LOG.warn("Input format edge value type is not known");
-    } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
-      throw new IllegalArgumentException(
-        "checkClassTypes: Edge value types don't match, " +
-          "vertex - " + edgeValueType +
-          ", vertex input format - " + classList.get(EDGE_PARAM_INDEX));
+  }
+
+  /** Verify matching generic types in EdgeInputFormat. */
+  private void verifyEdgeInputFormatGenericTypes() {
+    Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
+        BspUtils.<I, E>getEdgeInputFormatClass(conf);
+    if (edgeInputFormatClass != null) {
+      List<Class<?>> classList =
+          ReflectionUtils.getTypeArguments(
+              EdgeInputFormat.class, edgeInputFormatClass);
+      if (classList.get(ID_PARAM_INDEX) == null) {
+        LOG.warn("Input format vertex index type is not known");
+      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Vertex index types don't match, " +
+                "vertex - " + vertexIndexType +
+                ", edge input format - " + classList.get(ID_PARAM_INDEX));
+      }
+      if (classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX) == null) {
+        LOG.warn("Input format edge value type is not known");
+      } else if (!edgeValueType.equals(
+          classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Edge value types don't match, " +
+                "vertex - " + edgeValueType +
+                ", edge input format - " +
+                classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX));
+      }
     }
   }
 


Mime
View raw message