giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [2/2] git commit: GIRAPH-453: Pure Hive I/O (nitay)
Date Fri, 22 Feb 2013 04:03:28 GMT
Updated Branches:
  refs/heads/trunk ea914c9c0 -> 92517430e


GIRAPH-453: Pure Hive I/O (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/92517430
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/92517430
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/92517430

Branch: refs/heads/trunk
Commit: 92517430e6c80d9de19aedf1aafc57367d5240f0
Parents: ea914c9
Author: Nitay Joffe <nitayj@fb.com>
Authored: Wed Nov 14 18:51:50 2012 -0800
Committer: Nitay Joffe <nitay@apache.org>
Committed: Thu Feb 21 22:25:32 2013 -0500

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 giraph-accumulo/pom.xml                            |    1 -
 .../org/apache/giraph/comm/netty/NettyClient.java  |    2 +-
 .../apache/giraph/conf/GiraphConfiguration.java    |    4 +-
 giraph-hbase/pom.xml                               |    1 -
 giraph-hcatalog/pom.xml                            |    1 -
 .../giraph/io/hcatalog/HCatGiraphRunner.java       |  490 ++++++++++
 .../giraph/io/hcatalog/HiveGiraphRunner.java       |  490 ----------
 .../org/apache/giraph/io/hcatalog/HiveUtils.java   |    2 +-
 giraph-hive/pom.xml                                |  137 +++
 giraph-hive/src/main/assembly/compile.xml          |   39 +
 .../org/apache/giraph/hive/HiveGiraphRunner.java   |  705 +++++++++++++++
 .../apache/giraph/hive/common/HiveProfiles.java    |   37 +
 .../apache/giraph/hive/common/package-info.java    |   21 +
 .../hive/input/edge/HiveEdgeInputFormat.java       |   82 ++
 .../giraph/hive/input/edge/HiveEdgeReader.java     |  173 ++++
 .../apache/giraph/hive/input/edge/HiveToEdge.java  |   58 ++
 .../giraph/hive/input/edge/package-info.java       |   21 +
 .../org/apache/giraph/hive/input/package-info.java |   21 +
 .../giraph/hive/input/vertex/HiveToVertex.java     |   45 +
 .../hive/input/vertex/HiveVertexInputFormat.java   |   84 ++
 .../giraph/hive/input/vertex/HiveVertexReader.java |  146 +++
 .../giraph/hive/input/vertex/package-info.java     |   21 +
 .../giraph/hive/output/HiveVertexOutputFormat.java |   83 ++
 .../giraph/hive/output/HiveVertexWriter.java       |  136 +++
 .../apache/giraph/hive/output/VertexToHive.java    |   44 +
 .../apache/giraph/hive/output/package-info.java    |   22 +
 .../java/org/apache/giraph/hive/package-info.java  |   21 +
 pom.xml                                            |   47 +-
 29 files changed, 2434 insertions(+), 502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 742b6ea..60dcbf1 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-453: Pure Hive I/O (nitay)
+
   GIRAPH-526: HiveGiraphRunner - bug with setting database name (majakabiljo)
 
   GIRAPH-518: Support Hadoop-2.0.3-alpha release on Giraph (ereisman)

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-accumulo/pom.xml b/giraph-accumulo/pom.xml
index cb9fbc0..2734f71 100644
--- a/giraph-accumulo/pom.xml
+++ b/giraph-accumulo/pom.xml
@@ -205,7 +205,6 @@ under the License.
     <dependency>
       <groupId>org.apache.giraph</groupId>
       <artifactId>giraph-core</artifactId>
-      <version>0.2-SNAPSHOT</version>
       <type>test-jar</type>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 89ef87f..feae3e2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -126,7 +126,7 @@ public class NettyClient {
   private final boolean limitNumberOfOpenRequests;
   /** Maximum number of requests without confirmation we can have */
   private final int maxNumberOfOpenRequests;
-  /** Maximum number of connnection failures */
+  /** Maximum number of connection failures */
   private final int maxConnectionFailures;
   /** Maximum number of milliseconds for a request */
   private final int maxRequestMilliseconds;

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index ddeaeb7..3ea8d3b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -242,9 +242,7 @@ public class GiraphConfiguration extends Configuration
    */
   public final void setVertexCombinerClass(
       Class<? extends Combiner> vertexCombinerClass) {
-    setClass(VERTEX_COMBINER_CLASS,
-        vertexCombinerClass,
-        Combiner.class);
+    setClass(VERTEX_COMBINER_CLASS, vertexCombinerClass, Combiner.class);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-hbase/pom.xml b/giraph-hbase/pom.xml
index 7bbbd98..3d24f3b 100644
--- a/giraph-hbase/pom.xml
+++ b/giraph-hbase/pom.xml
@@ -213,7 +213,6 @@ under the License.
     <dependency>
       <groupId>org.apache.giraph</groupId>
       <artifactId>giraph-core</artifactId>
-      <version>0.2-SNAPSHOT</version>
       <type>test-jar</type>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/pom.xml b/giraph-hcatalog/pom.xml
index 019f020..b4ca06c 100644
--- a/giraph-hcatalog/pom.xml
+++ b/giraph-hcatalog/pom.xml
@@ -186,7 +186,6 @@ under the License.
     <dependency>
       <groupId>org.apache.giraph</groupId>
       <artifactId>giraph-core</artifactId>
-      <version>0.2-SNAPSHOT</version>
       <type>test-jar</type>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatGiraphRunner.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatGiraphRunner.java
new file mode 100644
index 0000000..1bd0235
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatGiraphRunner.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hive Giraph Runner
+ */
+public class HCatGiraphRunner implements Tool {
+  /**
+   * logger
+   */
+  private static final Logger LOG = Logger.getLogger(HCatGiraphRunner.class);
+  /**
+   * workers
+   */
+  protected int workers;
+  /**
+   * is verbose
+   */
+  protected boolean isVerbose;
+  /**
+   * output table partitions
+   */
+  protected Map<String, String> outputTablePartitionValues;
+  /**
+   * dbName
+   */
+  protected String dbName;
+  /**
+   * vertex input table name
+   */
+  protected String vertexInputTableName;
+  /**
+   * vertex input table filter
+   */
+  protected String vertexInputTableFilterExpr;
+  /**
+   * edge input table name
+   */
+  protected String edgeInputTableName;
+  /**
+   * edge input table filter
+   */
+  protected String edgeInputTableFilterExpr;
+  /**
+   * output table name
+   */
+  protected String outputTableName;
+  /** Configuration */
+  private Configuration conf;
+  /** Skip output? (Useful for testing without writing) */
+  private boolean skipOutput = false;
+
+  /**
+  * vertex class.
+  */
+  private Class<? extends Vertex> vertexClass;
+  /**
+   * vertex input format internal.
+   */
+  private Class<? extends VertexInputFormat> vertexInputFormatClass;
+  /**
+   * edge input format internal.
+   */
+  private Class<? extends EdgeInputFormat> edgeInputFormatClass;
+  /**
+  * vertex output format internal.
+  */
+  private Class<? extends VertexOutputFormat> vertexOutputFormatClass;
+
+  /**
+  * Giraph runner class.
+   *
+  * @param vertexClass Vertex class
+  * @param vertexInputFormatClass Vertex input format
+  * @param edgeInputFormatClass Edge input format
+  * @param vertexOutputFormatClass Output format
+  */
+  protected HCatGiraphRunner(
+      Class<? extends Vertex> vertexClass,
+      Class<? extends VertexInputFormat> vertexInputFormatClass,
+      Class<? extends EdgeInputFormat> edgeInputFormatClass,
+      Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
+    this.vertexClass = vertexClass;
+    this.vertexInputFormatClass = vertexInputFormatClass;
+    this.edgeInputFormatClass = edgeInputFormatClass;
+    this.vertexOutputFormatClass = vertexOutputFormatClass;
+    this.conf = new HiveConf(getClass());
+  }
+
+  /**
+  * main method
+  * @param args system arguments
+  * @throws Exception any errors from Hive Giraph Runner
+  */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(
+        new HCatGiraphRunner(null, null, null, null), args));
+  }
+
+  @Override
+  public final int run(String[] args) throws Exception {
+    // process args
+    try {
+      processArguments(args);
+    } catch (InterruptedException e) {
+      return 0;
+    } catch (IllegalArgumentException e) {
+      System.err.println(e.getMessage());
+      return -1;
+    }
+
+    // additional configuration for Hive
+    adjustConfigurationForHive(getConf());
+
+    // setup GiraphJob
+    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+    job.getConfiguration().setVertexClass(vertexClass);
+
+    // setup input from Hive
+    if (vertexInputFormatClass != null) {
+      InputJobInfo vertexInputJobInfo = InputJobInfo.create(dbName,
+          vertexInputTableName, vertexInputTableFilterExpr);
+      GiraphHCatInputFormat.setVertexInput(job.getInternalJob(),
+          vertexInputJobInfo);
+      job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
+    }
+    if (edgeInputFormatClass != null) {
+      InputJobInfo edgeInputJobInfo = InputJobInfo.create(dbName,
+          edgeInputTableName, edgeInputTableFilterExpr);
+      GiraphHCatInputFormat.setEdgeInput(job.getInternalJob(),
+          edgeInputJobInfo);
+      job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
+    }
+
+    // setup output to Hive
+    HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
+        dbName, outputTableName, outputTablePartitionValues));
+    HCatOutputFormat.setSchema(job.getInternalJob(),
+        HCatOutputFormat.getTableSchema(job.getInternalJob()));
+    if (skipOutput) {
+      LOG.warn("run: Warning - Output will be skipped!");
+    } else {
+      job.getConfiguration().setVertexOutputFormatClass(
+          vertexOutputFormatClass);
+    }
+
+    job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
+    initGiraphJob(job);
+
+    return job.run(isVerbose) ? 0 : -1;
+  }
+
+  /**
+  * set hive configuration
+  * @param conf Configuration argument
+  */
+  private static void adjustConfigurationForHive(Configuration conf) {
+    // when output partitions are used, workers register them to the
+    // metastore at cleanup stage, and on HiveConf's initialization, it
+    // looks for hive-site.xml from.
+    addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
+        .getResource("hive-site.xml").toString());
+
+    // Also, you need hive.aux.jars as well
+    // addToStringCollection(conf, "tmpjars",
+    // conf.getStringCollection("hive.aux.jars.path"));
+
+    // Or, more effectively, we can provide all the jars client needed to
+    // the workers as well
+    String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
+        File.pathSeparator);
+    List<String> hadoopJarURLs = Lists.newArrayList();
+    for (String jarPath : hadoopJars) {
+      File file = new File(jarPath);
+      if (file.exists() && file.isFile()) {
+        String jarURL = file.toURI().toString();
+        hadoopJarURLs.add(jarURL);
+      }
+    }
+    addToStringCollection(conf, "tmpjars", hadoopJarURLs);
+  }
+
+  /**
+  * process arguments
+  * @param args to process
+  * @return CommandLine instance
+  * @throws ParseException error parsing arguments
+  * @throws InterruptedException interrupted
+  */
+  private CommandLine processArguments(String[] args) throws ParseException,
+            InterruptedException {
+    Options options = new Options();
+    options.addOption("h", "help", false, "Help");
+    options.addOption("v", "verbose", false, "Verbose");
+    options.addOption("D", "hiveconf", true,
+                "property=value for Hive/Hadoop configuration");
+    options.addOption("w", "workers", true, "Number of workers");
+    if (vertexClass == null) {
+      options.addOption(null, "vertexClass", true,
+          "Giraph Vertex class to use");
+    }
+    if (vertexInputFormatClass == null) {
+      options.addOption(null, "vertexInputFormatClass", true,
+          "Giraph HCatalogVertexInputFormat class to use");
+    }
+    if (edgeInputFormatClass == null) {
+      options.addOption(null, "edgeInputFormatClass", true,
+          "Giraph HCatalogEdgeInputFormat class to use");
+    }
+
+    if (vertexOutputFormatClass == null) {
+      options.addOption(null, "vertexOutputFormatClass", true,
+          "Giraph HCatalogVertexOutputFormat class to use");
+    }
+
+    options.addOption("db", "dbName", true, "Hive database name");
+    options.addOption("vi", "vertexInputTable", true,
+        "Vertex input table name");
+    options.addOption("VI", "vertexInputFilter", true,
+        "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
+    options.addOption("ei", "edgeInputTable", true,
+        "Edge input table name");
+    options.addOption("EI", "edgeInputFilter", true,
+        "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
+    options.addOption("o", "outputTable", true, "Output table name");
+    options.addOption("O", "outputPartition", true,
+        "Output table partition values (e.g., \"a=1,b=two\")");
+    options.addOption("s", "skipOutput", false, "Skip output?");
+
+    addMoreOptions(options);
+
+    CommandLineParser parser = new GnuParser();
+    final CommandLine cmdln = parser.parse(options, args);
+    if (args.length == 0 || cmdln.hasOption("help")) {
+      new HelpFormatter().printHelp(getClass().getName(), options, true);
+      throw new InterruptedException();
+    }
+
+    // Giraph classes
+    if (cmdln.hasOption("vertexClass")) {
+      vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
+          Vertex.class);
+    }
+    if (cmdln.hasOption("vertexInputFormatClass")) {
+      vertexInputFormatClass = findClass(
+          cmdln.getOptionValue("vertexInputFormatClass"),
+          HCatalogVertexInputFormat.class);
+    }
+    if (cmdln.hasOption("edgeInputFormatClass")) {
+      edgeInputFormatClass = findClass(
+          cmdln.getOptionValue("edgeInputFormatClass"),
+          HCatalogEdgeInputFormat.class);
+    }
+
+    if (cmdln.hasOption("vertexOutputFormatClass")) {
+      vertexOutputFormatClass = findClass(
+          cmdln.getOptionValue("vertexOutputFormatClass"),
+          HCatalogVertexOutputFormat.class);
+    }
+
+    if (cmdln.hasOption("skipOutput")) {
+      skipOutput = true;
+    }
+
+    if (vertexClass == null) {
+      throw new IllegalArgumentException(
+          "Need the Giraph Vertex class name (-vertexClass) to use");
+    }
+    if (vertexInputFormatClass == null && edgeInputFormatClass == null) {
+      throw new IllegalArgumentException(
+          "Need at least one of Giraph VertexInputFormat " +
+              "class name (-vertexInputFormatClass) and " +
+              "EdgeInputFormat class name (-edgeInputFormatClass)");
+    }
+    if (vertexOutputFormatClass == null) {
+      throw new IllegalArgumentException(
+          "Need the Giraph VertexOutputFormat " +
+              "class name (-vertexOutputFormatClass) to use");
+    }
+    if (!cmdln.hasOption("workers")) {
+      throw new IllegalArgumentException(
+          "Need to choose the number of workers (-w)");
+    }
+    if (!cmdln.hasOption("vertexInputTable") &&
+        vertexInputFormatClass != null) {
+      throw new IllegalArgumentException(
+          "Need to set the vertex input table name (-vi)");
+    }
+    if (!cmdln.hasOption("edgeInputTable") &&
+        edgeInputFormatClass != null) {
+      throw new IllegalArgumentException(
+          "Need to set the edge input table name (-ei)");
+    }
+    if (!cmdln.hasOption("outputTable")) {
+      throw new IllegalArgumentException(
+          "Need to set the output table name (-o)");
+    }
+    dbName = cmdln.getOptionValue("dbName", "default");
+    vertexInputTableName = cmdln.getOptionValue("vertexInputTable");
+    vertexInputTableFilterExpr = cmdln.getOptionValue("vertexInputFilter");
+    edgeInputTableName = cmdln.getOptionValue("edgeInputTable");
+    edgeInputTableFilterExpr = cmdln.getOptionValue("edgeInputFilter");
+    outputTableName = cmdln.getOptionValue("outputTable");
+    outputTablePartitionValues = HiveUtils.parsePartitionValues(cmdln
+                .getOptionValue("outputPartition"));
+    workers = Integer.parseInt(cmdln.getOptionValue("workers"));
+    isVerbose = cmdln.hasOption("verbose");
+
+    // pick up -hiveconf arguments
+    for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
+      String[] keyval = hiveconf.split("=", 2);
+      if (keyval.length == 2) {
+        String name = keyval[0];
+        String value = keyval[1];
+        if (name.equals("tmpjars") || name.equals("tmpfiles")) {
+          addToStringCollection(
+                  conf, name, value);
+        } else {
+          conf.set(name, value);
+        }
+      }
+    }
+
+    processMoreArguments(cmdln);
+
+    return cmdln;
+  }
+
+  /**
+  * add string to collection
+  * @param conf Configuration
+  * @param name name to add
+  * @param values values for collection
+  */
+  private static void addToStringCollection(Configuration conf, String name,
+                                              String... values) {
+    addToStringCollection(conf, name, Arrays.asList(values));
+  }
+
+  /**
+  * add string to collection
+  * @param conf Configuration
+  * @param name to add
+  * @param values values for collection
+  */
+  private static void addToStringCollection(
+          Configuration conf, String name, Collection
+          <? extends String> values) {
+    Collection<String> tmpfiles = conf.getStringCollection(name);
+    tmpfiles.addAll(values);
+    conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
+  }
+
+  /**
+  *
+  * @param className to find
+  * @param base  base class
+  * @param <T> class type found
+  * @return type found
+  */
+  private <T> Class<? extends T> findClass(String className, Class<T> base) {
+    try {
+      Class<?> cls = Class.forName(className);
+      if (base.isAssignableFrom(cls)) {
+        return cls.asSubclass(base);
+      }
+      return null;
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(className + ": Invalid class name");
+    }
+  }
+
+  @Override
+  public final Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public final void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+  * Override this method to add more command-line options. You can process
+  * them by also overriding {@link #processMoreArguments(CommandLine)}.
+  *
+  * @param options Options
+  */
+  protected void addMoreOptions(Options options) {
+  }
+
+  /**
+  * Override this method to process additional command-line arguments. You
+  * may want to declare additional options by also overriding
+  * {@link #addMoreOptions(Options)}.
+  *
+  * @param cmd Command
+  */
+  protected void processMoreArguments(CommandLine cmd) {
+  }
+
+  /**
+  * Override this method to do additional setup with the GiraphJob that will
+  * run.
+  *
+  * @param job
+  *            GiraphJob that is going to run
+  */
+  protected void initGiraphJob(GiraphJob job) {
+    LOG.info(getClass().getSimpleName() + " with");
+    String prefix = "\t";
+    LOG.info(prefix + "-vertexClass=" +
+         vertexClass.getCanonicalName());
+    if (vertexInputFormatClass != null) {
+      LOG.info(prefix + "-vertexInputFormatClass=" +
+          vertexInputFormatClass.getCanonicalName());
+    }
+    if (edgeInputFormatClass != null) {
+      LOG.info(prefix + "-edgeInputFormatClass=" +
+          edgeInputFormatClass.getCanonicalName());
+    }
+    LOG.info(prefix + "-vertexOutputFormatClass=" +
+        vertexOutputFormatClass.getCanonicalName());
+    if (vertexInputTableName != null) {
+      LOG.info(prefix + "-vertexInputTable=" + vertexInputTableName);
+    }
+    if (vertexInputTableFilterExpr != null) {
+      LOG.info(prefix + "-vertexInputFilter=\"" +
+          vertexInputTableFilterExpr + "\"");
+    }
+    if (edgeInputTableName != null) {
+      LOG.info(prefix + "-edgeInputTable=" + edgeInputTableName);
+    }
+    if (edgeInputTableFilterExpr != null) {
+      LOG.info(prefix + "-edgeInputFilter=\"" +
+          edgeInputTableFilterExpr + "\"");
+    }
+    LOG.info(prefix + "-outputTable=" + outputTableName);
+    if (outputTablePartitionValues != null) {
+      LOG.info(prefix + "-outputPartition=\"" +
+          outputTablePartitionValues + "\"");
+    }
+    LOG.info(prefix + "-workers=" + workers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
deleted file mode 100644
index 313bab0..0000000
--- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Hive Giraph Runner
- */
-public class HiveGiraphRunner implements Tool {
-  /**
-   * logger
-   */
-  private static final Logger LOG = Logger.getLogger(HiveGiraphRunner.class);
-  /**
-   * workers
-   */
-  protected int workers;
-  /**
-   * is verbose
-   */
-  protected boolean isVerbose;
-  /**
-   * output table partitions
-   */
-  protected Map<String, String> outputTablePartitionValues;
-  /**
-   * dbName
-   */
-  protected String dbName;
-  /**
-   * vertex input table name
-   */
-  protected String vertexInputTableName;
-  /**
-   * vertex input table filter
-   */
-  protected String vertexInputTableFilterExpr;
-  /**
-   * edge input table name
-   */
-  protected String edgeInputTableName;
-  /**
-   * edge input table filter
-   */
-  protected String edgeInputTableFilterExpr;
-  /**
-   * output table name
-   */
-  protected String outputTableName;
-  /** Configuration */
-  private Configuration conf;
-  /** Skip output? (Useful for testing without writing) */
-  private boolean skipOutput = false;
-
-  /**
-  * vertex class.
-  */
-  private Class<? extends Vertex> vertexClass;
-  /**
-   * vertex input format internal.
-   */
-  private Class<? extends VertexInputFormat> vertexInputFormatClass;
-  /**
-   * edge input format internal.
-   */
-  private Class<? extends EdgeInputFormat> edgeInputFormatClass;
-  /**
-  * vertex output format internal.
-  */
-  private Class<? extends VertexOutputFormat> vertexOutputFormatClass;
-
-  /**
-  * Giraph runner class.
-   *
-  * @param vertexClass Vertex class
-  * @param vertexInputFormatClass Vertex input format
-  * @param edgeInputFormatClass Edge input format
-  * @param vertexOutputFormatClass Output format
-  */
-  protected HiveGiraphRunner(
-      Class<? extends Vertex> vertexClass,
-      Class<? extends VertexInputFormat> vertexInputFormatClass,
-      Class<? extends EdgeInputFormat> edgeInputFormatClass,
-      Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
-    this.vertexClass = vertexClass;
-    this.vertexInputFormatClass = vertexInputFormatClass;
-    this.edgeInputFormatClass = edgeInputFormatClass;
-    this.vertexOutputFormatClass = vertexOutputFormatClass;
-    this.conf = new HiveConf(getClass());
-  }
-
-  /**
-  * main method
-  * @param args system arguments
-  * @throws Exception any errors from Hive Giraph Runner
-  */
-  public static void main(String[] args) throws Exception {
-    System.exit(ToolRunner.run(
-        new HiveGiraphRunner(null, null, null, null), args));
-  }
-
-  @Override
-  public final int run(String[] args) throws Exception {
-    // process args
-    try {
-      processArguments(args);
-    } catch (InterruptedException e) {
-      return 0;
-    } catch (IllegalArgumentException e) {
-      System.err.println(e.getMessage());
-      return -1;
-    }
-
-    // additional configuration for Hive
-    adjustConfigurationForHive(getConf());
-
-    // setup GiraphJob
-    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
-    job.getConfiguration().setVertexClass(vertexClass);
-
-    // setup input from Hive
-    if (vertexInputFormatClass != null) {
-      InputJobInfo vertexInputJobInfo = InputJobInfo.create(dbName,
-          vertexInputTableName, vertexInputTableFilterExpr);
-      GiraphHCatInputFormat.setVertexInput(job.getInternalJob(),
-          vertexInputJobInfo);
-      job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
-    }
-    if (edgeInputFormatClass != null) {
-      InputJobInfo edgeInputJobInfo = InputJobInfo.create(dbName,
-          edgeInputTableName, edgeInputTableFilterExpr);
-      GiraphHCatInputFormat.setEdgeInput(job.getInternalJob(),
-          edgeInputJobInfo);
-      job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
-    }
-
-    // setup output to Hive
-    HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
-        dbName, outputTableName, outputTablePartitionValues));
-    HCatOutputFormat.setSchema(job.getInternalJob(),
-        HCatOutputFormat.getTableSchema(job.getInternalJob()));
-    if (skipOutput) {
-      LOG.warn("run: Warning - Output will be skipped!");
-    } else {
-      job.getConfiguration().setVertexOutputFormatClass(
-          vertexOutputFormatClass);
-    }
-
-    job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
-    initGiraphJob(job);
-
-    return job.run(isVerbose) ? 0 : -1;
-  }
-
-  /**
-  * set hive configuration
-  * @param conf Configuration argument
-  */
-  private static void adjustConfigurationForHive(Configuration conf) {
-    // when output partitions are used, workers register them to the
-    // metastore at cleanup stage, and on HiveConf's initialization, it
-    // looks for hive-site.xml from.
-    addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
-        .getResource("hive-site.xml").toString());
-
-    // Also, you need hive.aux.jars as well
-    // addToStringCollection(conf, "tmpjars",
-    // conf.getStringCollection("hive.aux.jars.path"));
-
-    // Or, more effectively, we can provide all the jars client needed to
-    // the workers as well
-    String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
-        File.pathSeparator);
-    List<String> hadoopJarURLs = Lists.newArrayList();
-    for (String jarPath : hadoopJars) {
-      File file = new File(jarPath);
-      if (file.exists() && file.isFile()) {
-        String jarURL = file.toURI().toString();
-        hadoopJarURLs.add(jarURL);
-      }
-    }
-    addToStringCollection(conf, "tmpjars", hadoopJarURLs);
-  }
-
-  /**
-  * process arguments
-  * @param args to process
-  * @return CommandLine instance
-  * @throws ParseException error parsing arguments
-  * @throws InterruptedException interrupted
-  */
-  private CommandLine processArguments(String[] args) throws ParseException,
-            InterruptedException {
-    Options options = new Options();
-    options.addOption("h", "help", false, "Help");
-    options.addOption("v", "verbose", false, "Verbose");
-    options.addOption("D", "hiveconf", true,
-                "property=value for Hive/Hadoop configuration");
-    options.addOption("w", "workers", true, "Number of workers");
-    if (vertexClass == null) {
-      options.addOption(null, "vertexClass", true,
-          "Giraph Vertex class to use");
-    }
-    if (vertexInputFormatClass == null) {
-      options.addOption(null, "vertexInputFormatClass", true,
-          "Giraph HCatalogVertexInputFormat class to use");
-    }
-    if (edgeInputFormatClass == null) {
-      options.addOption(null, "edgeInputFormatClass", true,
-          "Giraph HCatalogEdgeInputFormat class to use");
-    }
-
-    if (vertexOutputFormatClass == null) {
-      options.addOption(null, "vertexOutputFormatClass", true,
-          "Giraph HCatalogVertexOutputFormat class to use");
-    }
-
-    options.addOption("db", "dbName", true, "Hive database name");
-    options.addOption("vi", "vertexInputTable", true,
-        "Vertex input table name");
-    options.addOption("VI", "vertexInputFilter", true,
-        "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
-    options.addOption("ei", "edgeInputTable", true,
-        "Edge input table name");
-    options.addOption("EI", "edgeInputFilter", true,
-        "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
-    options.addOption("o", "outputTable", true, "Output table name");
-    options.addOption("O", "outputPartition", true,
-        "Output table partition values (e.g., \"a=1,b=two\")");
-    options.addOption("s", "skipOutput", false, "Skip output?");
-
-    addMoreOptions(options);
-
-    CommandLineParser parser = new GnuParser();
-    final CommandLine cmdln = parser.parse(options, args);
-    if (args.length == 0 || cmdln.hasOption("help")) {
-      new HelpFormatter().printHelp(getClass().getName(), options, true);
-      throw new InterruptedException();
-    }
-
-    // Giraph classes
-    if (cmdln.hasOption("vertexClass")) {
-      vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
-          Vertex.class);
-    }
-    if (cmdln.hasOption("vertexInputFormatClass")) {
-      vertexInputFormatClass = findClass(
-          cmdln.getOptionValue("vertexInputFormatClass"),
-          HCatalogVertexInputFormat.class);
-    }
-    if (cmdln.hasOption("edgeInputFormatClass")) {
-      edgeInputFormatClass = findClass(
-          cmdln.getOptionValue("edgeInputFormatClass"),
-          HCatalogEdgeInputFormat.class);
-    }
-
-    if (cmdln.hasOption("vertexOutputFormatClass")) {
-      vertexOutputFormatClass = findClass(
-          cmdln.getOptionValue("vertexOutputFormatClass"),
-          HCatalogVertexOutputFormat.class);
-    }
-
-    if (cmdln.hasOption("skipOutput")) {
-      skipOutput = true;
-    }
-
-    if (vertexClass == null) {
-      throw new IllegalArgumentException(
-          "Need the Giraph Vertex class name (-vertexClass) to use");
-    }
-    if (vertexInputFormatClass == null && edgeInputFormatClass == null) {
-      throw new IllegalArgumentException(
-          "Need at least one of Giraph VertexInputFormat " +
-              "class name (-vertexInputFormatClass) and " +
-              "EdgeInputFormat class name (-edgeInputFormatClass)");
-    }
-    if (vertexOutputFormatClass == null) {
-      throw new IllegalArgumentException(
-          "Need the Giraph VertexOutputFormat " +
-              "class name (-vertexOutputFormatClass) to use");
-    }
-    if (!cmdln.hasOption("workers")) {
-      throw new IllegalArgumentException(
-          "Need to choose the number of workers (-w)");
-    }
-    if (!cmdln.hasOption("vertexInputTable") &&
-        vertexInputFormatClass != null) {
-      throw new IllegalArgumentException(
-          "Need to set the vertex input table name (-vi)");
-    }
-    if (!cmdln.hasOption("edgeInputTable") &&
-        edgeInputFormatClass != null) {
-      throw new IllegalArgumentException(
-          "Need to set the edge input table name (-ei)");
-    }
-    if (!cmdln.hasOption("outputTable")) {
-      throw new IllegalArgumentException(
-          "Need to set the output table name (-o)");
-    }
-    dbName = cmdln.getOptionValue("dbName", "default");
-    vertexInputTableName = cmdln.getOptionValue("vertexInputTable");
-    vertexInputTableFilterExpr = cmdln.getOptionValue("vertexInputFilter");
-    edgeInputTableName = cmdln.getOptionValue("edgeInputTable");
-    edgeInputTableFilterExpr = cmdln.getOptionValue("edgeInputFilter");
-    outputTableName = cmdln.getOptionValue("outputTable");
-    outputTablePartitionValues = HiveUtils.parsePartitionValues(cmdln
-                .getOptionValue("outputPartition"));
-    workers = Integer.parseInt(cmdln.getOptionValue("workers"));
-    isVerbose = cmdln.hasOption("verbose");
-
-    // pick up -hiveconf arguments
-    for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
-      String[] keyval = hiveconf.split("=", 2);
-      if (keyval.length == 2) {
-        String name = keyval[0];
-        String value = keyval[1];
-        if (name.equals("tmpjars") || name.equals("tmpfiles")) {
-          addToStringCollection(
-                  conf, name, value);
-        } else {
-          conf.set(name, value);
-        }
-      }
-    }
-
-    processMoreArguments(cmdln);
-
-    return cmdln;
-  }
-
-  /**
-  * add string to collection
-  * @param conf Configuration
-  * @param name name to add
-  * @param values values for collection
-  */
-  private static void addToStringCollection(Configuration conf, String name,
-                                              String... values) {
-    addToStringCollection(conf, name, Arrays.asList(values));
-  }
-
-  /**
-  * add string to collection
-  * @param conf Configuration
-  * @param name to add
-  * @param values values for collection
-  */
-  private static void addToStringCollection(
-          Configuration conf, String name, Collection
-          <? extends String> values) {
-    Collection<String> tmpfiles = conf.getStringCollection(name);
-    tmpfiles.addAll(values);
-    conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
-  }
-
-  /**
-  *
-  * @param className to find
-  * @param base  base class
-  * @param <T> class type found
-  * @return type found
-  */
-  private <T> Class<? extends T> findClass(String className, Class<T> base) {
-    try {
-      Class<?> cls = Class.forName(className);
-      if (base.isAssignableFrom(cls)) {
-        return cls.asSubclass(base);
-      }
-      return null;
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException(className + ": Invalid class name");
-    }
-  }
-
-  @Override
-  public final Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public final void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-  * Override this method to add more command-line options. You can process
-  * them by also overriding {@link #processMoreArguments(CommandLine)}.
-  *
-  * @param options Options
-  */
-  protected void addMoreOptions(Options options) {
-  }
-
-  /**
-  * Override this method to process additional command-line arguments. You
-  * may want to declare additional options by also overriding
-  * {@link #addMoreOptions(Options)}.
-  *
-  * @param cmd Command
-  */
-  protected void processMoreArguments(CommandLine cmd) {
-  }
-
-  /**
-  * Override this method to do additional setup with the GiraphJob that will
-  * run.
-  *
-  * @param job
-  *            GiraphJob that is going to run
-  */
-  protected void initGiraphJob(GiraphJob job) {
-    LOG.info(getClass().getSimpleName() + " with");
-    String prefix = "\t";
-    LOG.info(prefix + "-vertexClass=" +
-         vertexClass.getCanonicalName());
-    if (vertexInputFormatClass != null) {
-      LOG.info(prefix + "-vertexInputFormatClass=" +
-          vertexInputFormatClass.getCanonicalName());
-    }
-    if (edgeInputFormatClass != null) {
-      LOG.info(prefix + "-edgeInputFormatClass=" +
-          edgeInputFormatClass.getCanonicalName());
-    }
-    LOG.info(prefix + "-vertexOutputFormatClass=" +
-        vertexOutputFormatClass.getCanonicalName());
-    if (vertexInputTableName != null) {
-      LOG.info(prefix + "-vertexInputTable=" + vertexInputTableName);
-    }
-    if (vertexInputTableFilterExpr != null) {
-      LOG.info(prefix + "-vertexInputFilter=\"" +
-          vertexInputTableFilterExpr + "\"");
-    }
-    if (edgeInputTableName != null) {
-      LOG.info(prefix + "-edgeInputTable=" + edgeInputTableName);
-    }
-    if (edgeInputTableFilterExpr != null) {
-      LOG.info(prefix + "-edgeInputFilter=\"" +
-          edgeInputTableFilterExpr + "\"");
-    }
-    LOG.info(prefix + "-outputTable=" + outputTableName);
-    if (outputTablePartitionValues != null) {
-      LOG.info(prefix + "-outputPartition=\"" +
-          outputTablePartitionValues + "\"");
-    }
-    LOG.info(prefix + "-workers=" + workers);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
index c1f76f1..fedd92c 100644
--- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
@@ -43,7 +43,7 @@ public class HiveUtils {
   * @return Map
   */
   public static Map<String, String> parsePartitionValues(
-            String outputTablePartitionString) {
+      String outputTablePartitionString) {
     if (outputTablePartitionString == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-hive/pom.xml b/giraph-hive/pom.xml
new file mode 100644
index 0000000..84b7c91
--- /dev/null
+++ b/giraph-hive/pom.xml
@@ -0,0 +1,137 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.giraph</groupId>
+    <artifactId>giraph-parent</artifactId>
+    <version>0.2-SNAPSHOT</version>
+  </parent>
+  <artifactId>giraph-hive</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Giraph Hive I/O</name>
+
+  <properties>
+    <top.dir>${project.basedir}/..</top.dir>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.9</version>
+        <configuration>
+          <configLocation>${top.dir}/checkstyle.xml</configLocation>
+          <headerLocation>${top.dir}/license-header.txt</headerLocation>
+          <enableRulesSummary>false</enableRulesSummary>
+          <failOnError>true</failOnError>
+          <includeTestSourceDirectory>false</includeTestSourceDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+               <goal>check</goal>
+             </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.6</version>
+        <configuration>
+          <systemProperties>
+            <property>
+              <name>prop.jarLocation</name>
+              <value>${top.dir}/giraph-core/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <version>2.5.1</version>
+        <configuration>
+          <xmlOutput>true</xmlOutput>
+          <findbugsXmlOutput>false</findbugsXmlOutput>
+          <excludeFilterFile>${top.dir}/findbugs-exclude.xml</excludeFilterFile>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <!-- compile dependencies. sorted lexicographically. -->
+    <dependency>
+      <groupId>com.facebook.giraph.hive</groupId>
+      <artifactId>hive-io-experimental</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.spullara.cli-parser</groupId>
+      <artifactId>cli-parser</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.giraph</groupId>
+      <artifactId>giraph-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-metastore</artifactId>
+    </dependency>
+
+    <!-- test dependencies. sorted lexicographically. -->
+    <dependency>
+      <groupId>org.apache.giraph</groupId>
+      <artifactId>giraph-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/assembly/compile.xml
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/assembly/compile.xml b/giraph-hive/src/main/assembly/compile.xml
new file mode 100644
index 0000000..fcaffa6
--- /dev/null
+++ b/giraph-hive/src/main/assembly/compile.xml
@@ -0,0 +1,39 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <id>jar-with-dependencies</id>
+   <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>true</useProjectArtifact>
+      <outputDirectory>/</outputDirectory>
+      <unpackOptions>
+          <excludes>
+              <exclude>META-INF/LICENSE</exclude>
+          </excludes>
+      </unpackOptions>
+      <unpack>true</unpack>
+      <scope>runtime</scope>
+    </dependencySet>
+  </dependencySets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
new file mode 100644
index 0000000..cef96b5
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
+import org.apache.giraph.hive.input.edge.HiveEdgeReader;
+import org.apache.giraph.hive.input.edge.HiveToEdge;
+import org.apache.giraph.hive.input.vertex.HiveToVertex;
+import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
+import org.apache.giraph.hive.input.vertex.HiveVertexReader;
+import org.apache.giraph.hive.output.HiveVertexOutputFormat;
+import org.apache.giraph.hive.output.HiveVertexWriter;
+import org.apache.giraph.hive.output.VertexToHive;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+import com.facebook.giraph.hive.input.HiveApiInputFormat;
+import com.facebook.giraph.hive.input.HiveInputDescription;
+import com.facebook.giraph.hive.output.HiveApiOutputFormat;
+import com.facebook.giraph.hive.output.HiveOutputDescription;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hive Giraph Runner
+ */
+public class HiveGiraphRunner implements Tool {
+  /** logger */
+  private static final Logger LOG = Logger.getLogger(HiveGiraphRunner.class);
+  /** Prefix for log statements */
+  private static final String LOG_PREFIX = "\t";
+
+  /** workers */
+  protected int workers;
+  /** is verbose */
+  protected boolean isVerbose;
+
+  /** vertex class. */
+  private Class<? extends Vertex> vertexClass;
+
+  /** Vertex creator from hive records. */
+  private Class<? extends HiveToVertex> hiveToVertexClass;
+  /** hive vertex input information */
+  private  final HiveInputDescription hiveVertexInputDescription;
+
+  /** Edge creator from hive records. */
+  private Class<? extends HiveToEdge> hiveToEdgeClass;
+  /** hive edge input information */
+  private final HiveInputDescription hiveEdgeInputDescription;
+
+  /** Hive Vertex writer */
+  private Class<? extends VertexToHive> vertexToHiveClass;
+  /** hive output information */
+  private final HiveOutputDescription hiveOutputDescription;
+  /** Skip output? (Useful for testing without writing) */
+  private boolean skipOutput = false;
+
+  /** Configuration */
+  private Configuration conf;
+
+  /** Create a new runner */
+  public HiveGiraphRunner() {
+    conf = new HiveConf(getClass());
+    hiveVertexInputDescription = new HiveInputDescription();
+    hiveEdgeInputDescription = new HiveInputDescription();
+    hiveOutputDescription = new HiveOutputDescription();
+  }
+
+  /**
+   * Get Vertex class used.
+   *
+   * @return Vertex class
+   */
+  public Class<? extends Vertex> getVertexClass() {
+    return vertexClass;
+  }
+
+  /**
+   * Set Vertex class to use
+   *
+   * @param vertexClass Vertex class
+   */
+  public void setVertexClass(Class<? extends Vertex> vertexClass) {
+    this.vertexClass = vertexClass;
+  }
+
+  /**
+   * Get hive vertex input description
+   *
+   * @return HiveInputDescription
+   */
+  public HiveInputDescription getHiveVertexInputDescription() {
+    return hiveVertexInputDescription;
+  }
+
+  /**
+   * Get hive output description
+   *
+   * @return HiveOutputDescription
+   */
+  public HiveOutputDescription getHiveOutputDescription() {
+    return hiveOutputDescription;
+  }
+
+  /**
+   * Get hive input description
+   *
+   * @return HiveInputDescription
+   */
+  public HiveInputDescription getHiveEdgeInputDescription() {
+    return hiveEdgeInputDescription;
+  }
+
+  /**
+   * Get HiveVertexCreator used with HiveVertexInputFormat
+   *
+   * @return HiveVertexCreator
+   */
+  public Class<? extends HiveToVertex> getHiveToVertexClass() {
+    return hiveToVertexClass;
+  }
+
+  /**
+   * Whether to use vertex input.
+   *
+   * @return true if vertex input enabled (HiveVertexCreator is set).
+   */
+  public boolean hasVertexInput() {
+    return hiveToVertexClass != null;
+  }
+
+  /**
+   * Set class used to convert hive records to Vertexes.
+   *
+   * @param hiveToVertexValueClass HiveToVertex class
+   */
+  public void setHiveToVertexValueClass(
+      Class<? extends HiveToVertex> hiveToVertexValueClass) {
+    setHiveToVertexClass(hiveToVertexValueClass);
+  }
+
+  /**
+   * Set HiveVertexCreator used with HiveVertexInputFormat
+   *
+   * @param hiveToVertexClass HiveVertexCreator
+   */
+  public void setHiveToVertexClass(
+      Class<? extends HiveToVertex> hiveToVertexClass) {
+    this.hiveToVertexClass = hiveToVertexClass;
+    conf.setClass(HiveVertexReader.HIVE_TO_VERTEX_KEY, hiveToVertexClass,
+        HiveToVertex.class);
+  }
+
+  /**
+   * Get HiveEdgeCreator used with HiveEdgeInputFormat
+   *
+   * @return HiveEdgeCreator
+   */
+  public Class<? extends HiveToEdge> getHiveToEdgeClass() {
+    return hiveToEdgeClass;
+  }
+
+  /**
+   * Whether to use edge input.
+   *
+   * @return true if edge input enabled (HiveEdgeCreator is set).
+   */
+  public boolean hasEdgeInput() {
+    return hiveToEdgeClass != null;
+  }
+
+  /**
+   * Set HiveEdgeCreator used with HiveEdgeInputFormat
+   *
+   * @param hiveToEdgeClass HiveEdgeCreator
+   */
+  public void setHiveToEdgeClass(Class<? extends HiveToEdge> hiveToEdgeClass) {
+    this.hiveToEdgeClass = hiveToEdgeClass;
+    conf.setClass(HiveEdgeReader.EDGE_CREATOR_KEY, hiveToEdgeClass,
+        HiveToEdge.class);
+  }
+
+  /**
+   * Get class used to write vertices to Hive.
+   *
+   * @return class for writing vertices to Hive
+   */
+  public Class<? extends VertexToHive> getVertexToHiveClass() {
+    return vertexToHiveClass;
+  }
+
+  /**
+   * Whether we are writing vertices out.
+   *
+   * @return true if vertex output enabled
+   */
+  public boolean hasVertexOutput() {
+    return !skipOutput && vertexToHiveClass != null;
+  }
+
+  /**
+   * Set class used to write vertices to Hive.
+   *
+   * @param vertexToHiveClass class for writing vertices to Hive.
+   */
+  public void setVertexToHiveClass(
+      Class<? extends VertexToHive> vertexToHiveClass) {
+    this.vertexToHiveClass = vertexToHiveClass;
+    conf.setClass(HiveVertexWriter.VERTEX_TO_HIVE_KEY, vertexToHiveClass,
+        VertexToHive.class);
+  }
+
+  /**
+   * main method
+   * @param args system arguments
+   * @throws Exception any errors from Hive Giraph Runner
+   */
+  public static void main(String[] args) throws Exception {
+    HiveGiraphRunner runner = new HiveGiraphRunner();
+    System.exit(ToolRunner.run(runner, args));
+  }
+
+  @Override
+  public final int run(String[] args) throws Exception {
+    // process args
+    try {
+      handleCommandLine(args);
+    } catch (InterruptedException e) {
+      return 0;
+    } catch (IllegalArgumentException e) {
+      System.err.println(e.getMessage());
+      return -1;
+    }
+
+    // additional configuration for Hive
+    adjustConfigurationForHive();
+
+    // setup GiraphJob
+    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+    GiraphConfiguration giraphConf = job.getConfiguration();
+    giraphConf.setVertexClass(vertexClass);
+
+    setupHiveInputs(giraphConf);
+    setupHiveOutput(giraphConf);
+
+    giraphConf.setWorkerConfiguration(workers, workers, 100.0f);
+    initGiraphJob(job);
+
+    logOptions(giraphConf);
+
+    return job.run(isVerbose) ? 0 : -1;
+  }
+
+  /**
+   * Initialize hive input settings
+   *
+   * @param conf Configuration to write to
+   * @throws TException thrift problem
+   */
+  private void setupHiveInputs(GiraphConfiguration conf) throws TException {
+    if (hiveToVertexClass != null) {
+      HiveApiInputFormat.initProfile(conf, hiveVertexInputDescription,
+          HiveProfiles.VERTEX_INPUT_PROFILE_ID);
+      conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
+    }
+
+    if (hiveToEdgeClass != null) {
+      HiveApiInputFormat.initProfile(conf, hiveEdgeInputDescription,
+          HiveProfiles.EDGE_INPUT_PROFILE_ID);
+      conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
+    }
+  }
+
+  /**
+   * Initialize hive output settings
+   *
+   * @param conf Configuration to write to
+   * @throws TException thrift problem
+   */
+  private void setupHiveOutput(GiraphConfiguration conf) throws TException {
+    if (skipOutput) {
+      LOG.warn("run: Warning - Output will be skipped!");
+    } else if (vertexToHiveClass != null) {
+      HiveApiOutputFormat.initProfile(conf, hiveOutputDescription,
+          HiveProfiles.VERTEX_OUTPUT_PROFILE_ID);
+      conf.setVertexOutputFormatClass(HiveVertexOutputFormat.class);
+    } else {
+      LOG.fatal("output requested but " + VertexToHive.class.getSimpleName() +
+          " not set");
+    }
+  }
+
+  /**
+  * set hive configuration
+  */
+  private void adjustConfigurationForHive() {
+    // when output partitions are used, workers register them to the
+    // metastore at cleanup stage, and on HiveConf's initialization, it
+    // looks for hive-site.xml.
+    addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
+        .getResource("hive-site.xml").toString());
+
+    // Or, more effectively, we can provide all the jars client needed to
+    // the workers as well
+    String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
+        File.pathSeparator);
+    List<String> hadoopJarURLs = Lists.newArrayList();
+    for (String jarPath : hadoopJars) {
+      File file = new File(jarPath);
+      if (file.exists() && file.isFile()) {
+        String jarURL = file.toURI().toString();
+        hadoopJarURLs.add(jarURL);
+      }
+    }
+    addToStringCollection(conf, "tmpjars", hadoopJarURLs);
+  }
+
+  /**
+  * process arguments
+  * @param args to process
+  * @return CommandLine instance
+  * @throws org.apache.commons.cli.ParseException error parsing arguments
+  * @throws InterruptedException interrupted
+  */
+  private CommandLine handleCommandLine(String[] args) throws ParseException,
+            InterruptedException {
+    Options options = new Options();
+    addOptions(options);
+    addMoreOptions(options);
+
+    CommandLineParser parser = new GnuParser();
+    final CommandLine cmdln = parser.parse(options, args);
+    if (args.length == 0 || cmdln.hasOption("help")) {
+      new HelpFormatter().printHelp(getClass().getName(), options, true);
+      throw new InterruptedException();
+    }
+
+    // Giraph classes
+    String vertexClassStr = cmdln.getOptionValue("vertexClass");
+    if (vertexClassStr != null) {
+      vertexClass = findClass(vertexClassStr, Vertex.class);
+    }
+    if (vertexClass == null) {
+      throw new IllegalArgumentException(
+          "Need the Giraph " + Vertex.class.getSimpleName() +
+              " class name (-vertexClass) to use");
+    }
+
+    String hiveToVertexClassStr = cmdln.getOptionValue("hiveToVertexClass");
+    if (hiveToVertexClassStr != null) {
+      hiveToVertexClass = findClass(hiveToVertexClassStr, HiveToVertex.class);
+    }
+
+    String hiveToEdgeClassStr = cmdln.getOptionValue("hiveToEdgeClass");
+    if (hiveToEdgeClassStr != null) {
+      hiveToEdgeClass = findClass(hiveToEdgeClassStr, HiveToEdge.class);
+    }
+
+    String vertexToHiveClassStr = cmdln.getOptionValue("vertexToHiveClass");
+    if (vertexToHiveClassStr != null) {
+      vertexToHiveClass = findClass(vertexToHiveClassStr, VertexToHive.class);
+    }
+
+    if (cmdln.hasOption("skipOutput")) {
+      skipOutput = true;
+    }
+
+//    if (hiveToVertexClass == null && hiveToEdgeClass == null) {
+//      throw new IllegalArgumentException(
+//          "Need at least one of Giraph " +
+//          HiveToVertex.class.getSimpleName() +
+//          " class name (-hiveToVertexClass) and " +
+//          HiveToEdge.class.getSimpleName() +
+//          " class name (-hiveToEdgeClass)");
+//    }
+//    if (vertexToHiveClass == null && !skipOutput) {
+//      throw new IllegalArgumentException(
+//          "Need the Giraph " + VertexToHive.class.getSimpleName() +
+//              " class name (-vertexToHiveClass) to use");
+//    }
+    String workersStr = cmdln.getOptionValue("workers");
+    if (workersStr == null) {
+      throw new IllegalArgumentException(
+          "Need to choose the number of workers (-w)");
+    }
+
+    String vertexInputTableStr = cmdln.getOptionValue("vertexInputTable");
+    if (vertexInputTableStr == null && hiveToVertexClass != null) {
+      throw new IllegalArgumentException(
+          "Need to set the vertex input table name (-vi)");
+    }
+
+    String edgeInputTableStr = cmdln.getOptionValue("edgeInputTable");
+    if (edgeInputTableStr == null && hiveToEdgeClass != null) {
+      throw new IllegalArgumentException(
+          "Need to set the edge input table name (-ei)");
+    }
+
+    String outputTableStr = cmdln.getOptionValue("outputTable");
+    if (outputTableStr == null) {
+      throw new IllegalArgumentException(
+          "Need to set the output table name (-o)");
+    }
+
+    String dbName = cmdln.getOptionValue("dbName", "default");
+    hiveVertexInputDescription.setDbName(dbName);
+    hiveEdgeInputDescription.setDbName(dbName);
+    hiveOutputDescription.setDbName(dbName);
+
+    hiveEdgeInputDescription.setPartitionFilter(
+        cmdln.getOptionValue("edgeInputFilter"));
+    hiveEdgeInputDescription.setTableName(edgeInputTableStr);
+
+    hiveVertexInputDescription.setPartitionFilter(
+        cmdln.getOptionValue("vertexInputFilter"));
+    hiveVertexInputDescription.setTableName(vertexInputTableStr);
+
+    hiveOutputDescription.setTableName(cmdln.getOptionValue("outputTable"));
+    hiveOutputDescription.setPartitionValues(
+        parsePartitionValues(cmdln.getOptionValue("outputPartition"))
+    );
+
+    workers = Integer.parseInt(workersStr);
+
+    isVerbose = cmdln.hasOption("verbose");
+
+    // pick up -hiveconf arguments
+    processHiveConfOptions(cmdln);
+
+    processMoreArguments(cmdln);
+
+    return cmdln;
+  }
+
+  /**
+   * Process -hiveconf options from command line
+   *
+   * @param cmdln Command line options
+   */
+  private void processHiveConfOptions(CommandLine cmdln) {
+    for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
+      String[] keyval = hiveconf.split("=", 2);
+      if (keyval.length == 2) {
+        String name = keyval[0];
+        String value = keyval[1];
+        if (name.equals("tmpjars") || name.equals("tmpfiles")) {
+          addToStringCollection(conf, name, value);
+        } else {
+          conf.set(name, value);
+        }
+      }
+    }
+  }
+
+  /**
+   * @param outputTablePartitionString table partition string
+   * @return Map
+   */
+  public static Map<String, String> parsePartitionValues(
+      String outputTablePartitionString) {
+    if (outputTablePartitionString == null) {
+      return null;
+    }
+    Splitter commaSplitter = Splitter.on(',').omitEmptyStrings().trimResults();
+    Splitter equalSplitter = Splitter.on('=').omitEmptyStrings().trimResults();
+    Map<String, String> partitionValues = Maps.newHashMap();
+    for (String keyValStr : commaSplitter.split(outputTablePartitionString)) {
+      List<String> keyVal = Lists.newArrayList(equalSplitter.split(keyValStr));
+      if (keyVal.size() != 2) {
+        throw new IllegalArgumentException(
+            "Unrecognized partition value format: " +
+            outputTablePartitionString);
+      }
+      partitionValues.put(keyVal.get(0), keyVal.get(1));
+    }
+    return partitionValues;
+  }
+
+  /**
+   * Add hive-related options to command line parser options
+   *
+   * @param options Options to use
+   */
+  private void addOptions(Options options) {
+    options.addOption("h", "help", false, "Help");
+    options.addOption("v", "verbose", false, "Verbose");
+    options.addOption("D", "hiveconf", true,
+                "property=value for Hive/Hadoop configuration");
+    options.addOption("w", "workers", true, "Number of workers");
+
+    if (vertexClass == null) {
+      options.addOption(null, "vertexClass", true,
+          "Giraph Vertex class to use");
+    }
+
+    options.addOption("db", "dbName", true, "Hive database name");
+
+    // Vertex input settings
+    if (hiveToVertexClass == null) {
+      options.addOption(null, "hiveToVertexClass", true,
+          "Giraph " + HiveToVertex.class.getSimpleName() + " class to use");
+    }
+    options.addOption("vi", "vertexInputTable", true,
+        "Vertex input table name");
+    options.addOption("VI", "vertexInputFilter", true,
+        "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
+
+    // Edge input settings
+    if (hiveToEdgeClass == null) {
+      options.addOption(null, "hiveToEdgeClass", true,
+          "Giraph " + HiveToEdge.class.getSimpleName() + " class to use");
+    }
+    options.addOption("ei", "edgeInputTable", true,
+        "Edge input table name");
+    options.addOption("EI", "edgeInputFilter", true,
+        "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
+
+    // Vertex output settings
+    if (vertexToHiveClass == null) {
+      options.addOption(null, "vertexToHiveClass", true,
+          "Giraph " + VertexToHive.class.getSimpleName() + " class to use");
+    }
+
+    options.addOption("o", "outputTable", true, "Output table name");
+    options.addOption("O", "outputPartition", true,
+        "Output table partition values (e.g., \"a=1,b=two\")");
+    options.addOption("s", "skipOutput", false, "Skip output?");
+  }
+
+  /**
+  * add string to collection
+  * @param conf Configuration
+  * @param name name to add
+  * @param values values for collection
+  */
+  private static void addToStringCollection(Configuration conf, String name,
+                                              String... values) {
+    addToStringCollection(conf, name, Arrays.asList(values));
+  }
+
+  /**
+  * add string to collection
+  * @param conf Configuration
+  * @param name to add
+  * @param values values for collection
+  */
+  private static void addToStringCollection(
+          Configuration conf, String name, Collection
+          <? extends String> values) {
+    Collection<String> tmpfiles = conf.getStringCollection(name);
+    tmpfiles.addAll(values);
+    conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
+  }
+
+  /**
+  *
+  * @param className to find
+  * @param base  base class
+  * @param <T> class type found
+  * @return type found
+  */
+  private <T> Class<? extends T> findClass(String className, Class<T> base) {
+    try {
+      Class<?> cls = Class.forName(className);
+      if (base.isAssignableFrom(cls)) {
+        return cls.asSubclass(base);
+      }
+      return null;
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(className + ": Invalid class name");
+    }
+  }
+
+  @Override
+  public final Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public final void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+  * Override this method to add more command-line options. You can process
+  * them by also overriding {@link #processMoreArguments(CommandLine)}.
+  *
+  * @param options Options
+  */
+  protected void addMoreOptions(Options options) {
+  }
+
+  /**
+  * Override this method to process additional command-line arguments. You
+  * may want to declare additional options by also overriding
+  * {@link #addMoreOptions(org.apache.commons.cli.Options)}.
+  *
+  * @param cmd Command
+  */
+  protected void processMoreArguments(CommandLine cmd) {
+  }
+
+  /**
+  * Override this method to do additional setup with the GiraphJob that will
+  * run.
+  *
+  * @param job GiraphJob that is going to run
+  */
+  protected void initGiraphJob(GiraphJob job) { }
+
+  /**
+   * Log the options set by user
+   *
+   * @param giraphConf GiraphConfiguration
+   */
+  private void logOptions(GiraphConfiguration giraphConf) {
+    GiraphClasses classes = new GiraphClasses(giraphConf);
+
+    LOG.info(getClass().getSimpleName() + " with");
+
+    LOG.info(LOG_PREFIX + "-vertexClass=" + vertexClass.getCanonicalName());
+
+    if (hiveToVertexClass != null) {
+      LOG.info(LOG_PREFIX + "-hiveToVertexClass=" +
+          hiveToVertexClass.getCanonicalName());
+    }
+    LOG.info(LOG_PREFIX + "-vertexInputFormatClass=" +
+        classes.getVertexInputFormatClass().getCanonicalName());
+    logInputDesc(hiveVertexInputDescription, "vertex");
+
+    if (hiveToEdgeClass != null) {
+      LOG.info(LOG_PREFIX + "-hiveToEdgeClass=" +
+          hiveToEdgeClass.getCanonicalName());
+    }
+    LOG.info(LOG_PREFIX + "-edgeInputFormatClass=" +
+        classes.getEdgeInputFormatClass().getCanonicalName());
+    logInputDesc(hiveEdgeInputDescription, "edge");
+
+    LOG.info(LOG_PREFIX + "-outputTable=" +
+        hiveOutputDescription.getTableName());
+    if (hiveOutputDescription.hasPartitionValues()) {
+      LOG.info(LOG_PREFIX + "-outputPartition=\"" +
+          hiveOutputDescription.getPartitionValues() + "\"");
+    }
+    LOG.info(LOG_PREFIX + "-outputFormatClass=" +
+        classes.getVertexOutputFormatClass().getCanonicalName());
+
+    LOG.info(LOG_PREFIX + "-workers=" + workers);
+  }
+
+  /**
+   * Helper to log input description with a name
+   *
+   * @param inputDesc input description to log
+   * @param name String prefix name
+   */
+  private void logInputDesc(HiveInputDescription inputDesc, String name) {
+    if (inputDesc.hasTableName()) {
+      LOG.info(
+          LOG_PREFIX + "-" + name + "InputTable=" + inputDesc.getTableName());
+    }
+    if (inputDesc.hasPartitionFilter()) {
+      LOG.info(LOG_PREFIX + "-" + name + "InputFilter=\"" +
+          inputDesc.getPartitionFilter() + "\"");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
new file mode 100644
index 0000000..b0ddc48
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.common;
+
+/**
+ * Profiles used throughout Hive code.
+ */
+public class HiveProfiles {
+  /** name of vertex input profile */
+  public static final String VERTEX_INPUT_PROFILE_ID = "vertex_input_profile";
+  /** name of edge input profile */
+  public static final String EDGE_INPUT_PROFILE_ID = "edge_input_profile";
+  /** name of vertex value input profile */
+  public static final String VERTEX_VALUE_INPUT_PROFILE_ID =
+      "vertex_value_input_profile";
+  /** Name of vertex output profile */
+  public static final String VERTEX_OUTPUT_PROFILE_ID = "vertex_output_profile";
+
+  /** Disable creation */
+  private HiveProfiles() { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java
new file mode 100644
index 0000000..1535b18
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Common Hive related utilities.
+ */
+package org.apache.giraph.hive.common;

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
new file mode 100644
index 0000000..17405c8
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.edge;
+
+import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.input.HiveApiInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link EdgeInputFormat} for reading edges from Hive.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class HiveEdgeInputFormat<I extends WritableComparable,
+    E extends Writable> extends EdgeInputFormat<I, E> {
+  /** Underlying Hive InputFormat used */
+  private final HiveApiInputFormat hiveInputFormat;
+
+  /**
+   * Create edge input format.
+   */
+  public HiveEdgeInputFormat() {
+    hiveInputFormat = new HiveApiInputFormat();
+    hiveInputFormat.setMyProfileId(HiveProfiles.EDGE_INPUT_PROFILE_ID);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    return hiveInputFormat.getSplits(context);
+  }
+
+  @Override
+  public EdgeReader<I, E> createEdgeReader(InputSplit split,
+                                           TaskAttemptContext context)
+    throws IOException {
+    Configuration conf = context.getConfiguration();
+
+    RecordReader<WritableComparable, HiveRecord> baseReader;
+    HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>();
+    reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
+
+    try {
+      baseReader = hiveInputFormat.createRecordReader(split, context);
+      reader.setHiveRecordReader(baseReader);
+      reader.initialize(split, context);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Could not create edge record reader", e);
+    }
+    return reader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
new file mode 100644
index 0000000..34dc4df
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.edge;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeFactory;
+import org.apache.giraph.graph.MutableEdge;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.HiveTableSchema;
+
+import java.io.IOException;
+
+/**
+ * A reader for reading edges from Hive.
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
+    implements EdgeReader<I, E> {
+  /** Configuration key for edge creator class */
+  public static final String EDGE_CREATOR_KEY = "giraph.hive.to.edge.class";
+  /** Configuration key for whether to reuse edge */
+  public static final String REUSE_EDGE_KEY = "giraph.hive.reuse.edge";
+
+  /** Underlying Hive RecordReader used */
+  private RecordReader<WritableComparable, HiveRecord> hiveRecordReader;
+  /** Schema for table in Hive */
+  private HiveTableSchema tableSchema;
+
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, ?, E, ?> conf;
+
+  /** User class to create edges from a HiveRecord */
+  private HiveToEdge<I, E> hiveToEdge;
+  /**
+   * If we are reusing edges this will be the single edge to read into.
+   * Otherwise if it's null we will create a new edge each time.
+   */
+  private MutableEdge<I, E> edgeToReuse = null;
+
+  /**
+   * Get underlying Hive record reader used.
+   *
+   * @return RecordReader from Hive
+   */
+  public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() {
+    return hiveRecordReader;
+  }
+
+  /**
+   * Set underlying Hive record reader used.
+   *
+   * @param hiveRecordReader RecordReader to read from Hive.
+   */
+  public void setHiveRecordReader(
+      RecordReader<WritableComparable, HiveRecord> hiveRecordReader) {
+    this.hiveRecordReader = hiveRecordReader;
+  }
+
+  /**
+   * Get Hive table schema for table being read from.
+   *
+   * @return Hive table schema for table
+   */
+  public HiveTableSchema getTableSchema() {
+    return tableSchema;
+  }
+
+  /**
+   * Set Hive schema for table being read from.
+   *
+   * @param tableSchema Hive table schema
+   */
+  public void setTableSchema(HiveTableSchema tableSchema) {
+    this.tableSchema = tableSchema;
+  }
+
+  /**
+   * Get our Configuration.
+   *
+   * @return ImmutableClassesGiraphConfiguration
+   */
+  public ImmutableClassesGiraphConfiguration<I, ?, E, ?> getConf() {
+    return conf;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    hiveRecordReader.initialize(inputSplit, context);
+    conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
+    instantiateHiveToEdgeFromConf();
+    if (conf.getBoolean(REUSE_EDGE_KEY, false)) {
+      edgeToReuse = conf.createMutableEdge();
+    }
+  }
+
+  /**
+   * Retrieve the user's HiveEdgeCreator from the Configuration.
+   *
+   * @throws IOException if anything goes wrong reading from Configuration
+   */
+  private void instantiateHiveToEdgeFromConf() throws IOException {
+    Class<? extends HiveToEdge> klass = conf.getClass(EDGE_CREATOR_KEY,
+        null, HiveToEdge.class);
+    if (klass == null) {
+      throw new IOException(EDGE_CREATOR_KEY + " not set in conf");
+    }
+    hiveToEdge = ReflectionUtils.newInstance(klass, conf);
+    hiveToEdge.setTableSchema(tableSchema);
+  }
+
+  @Override
+  public boolean nextEdge() throws IOException, InterruptedException {
+    return hiveRecordReader.nextKeyValue();
+  }
+
+  @Override
+  public void close() throws IOException {
+    hiveRecordReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return hiveRecordReader.getProgress();
+  }
+
+  @Override
+  public I getCurrentSourceId() throws IOException, InterruptedException {
+    return hiveToEdge.getSourceVertexId(hiveRecordReader.getCurrentValue());
+  }
+
+  @Override
+  public Edge<I, E> getCurrentEdge() throws IOException,
+      InterruptedException {
+    HiveRecord record = hiveRecordReader.getCurrentValue();
+    I targetId = hiveToEdge.getTargetVertexId(record);
+    E edgeValue = hiveToEdge.getEdgeValue(record);
+    if (edgeToReuse == null) {
+      return EdgeFactory.create(targetId, edgeValue);
+    } else {
+      edgeToReuse.setTargetVertexId(targetId);
+      edgeToReuse.setValue(edgeValue);
+      return edgeToReuse;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
new file mode 100644
index 0000000..e800321
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.HiveTableSchemaAware;
+
+/**
+ * An interface used to create edges from Hive records
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public interface HiveToEdge<I extends WritableComparable, E extends Writable>
+    extends HiveTableSchemaAware {
+  /**
+   * Read source vertex ID from Hive record
+   *
+   * @param hiveRecord HiveRecord to read from
+   * @return source vertex ID
+   */
+  I getSourceVertexId(HiveReadableRecord hiveRecord);
+
+  /**
+   * Read target vertex ID from Hive record
+   *
+   * @param hiveRecord HiveRecord to read from
+   * @return target vertex ID
+   */
+  I getTargetVertexId(HiveReadableRecord hiveRecord);
+
+  /**
+   * Read edge value from the Hive record.
+   *
+   * @param hiveRecord HiveRecord to read from
+   * @return Edge value
+   */
+  E getEdgeValue(HiveReadableRecord hiveRecord);
+}


Mime
View raw message