Updated Branches:
refs/heads/trunk ea914c9c0 -> 92517430e
GIRAPH-453: Pure Hive I/O (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/92517430
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/92517430
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/92517430
Branch: refs/heads/trunk
Commit: 92517430e6c80d9de19aedf1aafc57367d5240f0
Parents: ea914c9
Author: Nitay Joffe <nitayj@fb.com>
Authored: Wed Nov 14 18:51:50 2012 -0800
Committer: Nitay Joffe <nitay@apache.org>
Committed: Thu Feb 21 22:25:32 2013 -0500
----------------------------------------------------------------------
CHANGELOG | 2 +
giraph-accumulo/pom.xml | 1 -
.../org/apache/giraph/comm/netty/NettyClient.java | 2 +-
.../apache/giraph/conf/GiraphConfiguration.java | 4 +-
giraph-hbase/pom.xml | 1 -
giraph-hcatalog/pom.xml | 1 -
.../giraph/io/hcatalog/HCatGiraphRunner.java | 490 ++++++++++
.../giraph/io/hcatalog/HiveGiraphRunner.java | 490 ----------
.../org/apache/giraph/io/hcatalog/HiveUtils.java | 2 +-
giraph-hive/pom.xml | 137 +++
giraph-hive/src/main/assembly/compile.xml | 39 +
.../org/apache/giraph/hive/HiveGiraphRunner.java | 705 +++++++++++++++
.../apache/giraph/hive/common/HiveProfiles.java | 37 +
.../apache/giraph/hive/common/package-info.java | 21 +
.../hive/input/edge/HiveEdgeInputFormat.java | 82 ++
.../giraph/hive/input/edge/HiveEdgeReader.java | 173 ++++
.../apache/giraph/hive/input/edge/HiveToEdge.java | 58 ++
.../giraph/hive/input/edge/package-info.java | 21 +
.../org/apache/giraph/hive/input/package-info.java | 21 +
.../giraph/hive/input/vertex/HiveToVertex.java | 45 +
.../hive/input/vertex/HiveVertexInputFormat.java | 84 ++
.../giraph/hive/input/vertex/HiveVertexReader.java | 146 +++
.../giraph/hive/input/vertex/package-info.java | 21 +
.../giraph/hive/output/HiveVertexOutputFormat.java | 83 ++
.../giraph/hive/output/HiveVertexWriter.java | 136 +++
.../apache/giraph/hive/output/VertexToHive.java | 44 +
.../apache/giraph/hive/output/package-info.java | 22 +
.../java/org/apache/giraph/hive/package-info.java | 21 +
pom.xml | 47 +-
29 files changed, 2434 insertions(+), 502 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 742b6ea..60dcbf1 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-453: Pure Hive I/O (nitay)
+
GIRAPH-526: HiveGiraphRunner - bug with setting database name (majakabiljo)
GIRAPH-518: Support Hadoop-2.0.3-alpha release on Giraph (ereisman)
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-accumulo/pom.xml b/giraph-accumulo/pom.xml
index cb9fbc0..2734f71 100644
--- a/giraph-accumulo/pom.xml
+++ b/giraph-accumulo/pom.xml
@@ -205,7 +205,6 @@ under the License.
<dependency>
<groupId>org.apache.giraph</groupId>
<artifactId>giraph-core</artifactId>
- <version>0.2-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 89ef87f..feae3e2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -126,7 +126,7 @@ public class NettyClient {
private final boolean limitNumberOfOpenRequests;
/** Maximum number of requests without confirmation we can have */
private final int maxNumberOfOpenRequests;
- /** Maximum number of connnection failures */
+ /** Maximum number of connection failures */
private final int maxConnectionFailures;
/** Maximum number of milliseconds for a request */
private final int maxRequestMilliseconds;
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index ddeaeb7..3ea8d3b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -242,9 +242,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexCombinerClass(
Class<? extends Combiner> vertexCombinerClass) {
- setClass(VERTEX_COMBINER_CLASS,
- vertexCombinerClass,
- Combiner.class);
+ setClass(VERTEX_COMBINER_CLASS, vertexCombinerClass, Combiner.class);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-hbase/pom.xml b/giraph-hbase/pom.xml
index 7bbbd98..3d24f3b 100644
--- a/giraph-hbase/pom.xml
+++ b/giraph-hbase/pom.xml
@@ -213,7 +213,6 @@ under the License.
<dependency>
<groupId>org.apache.giraph</groupId>
<artifactId>giraph-core</artifactId>
- <version>0.2-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/pom.xml b/giraph-hcatalog/pom.xml
index 019f020..b4ca06c 100644
--- a/giraph-hcatalog/pom.xml
+++ b/giraph-hcatalog/pom.xml
@@ -186,7 +186,6 @@ under the License.
<dependency>
<groupId>org.apache.giraph</groupId>
<artifactId>giraph-core</artifactId>
- <version>0.2-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatGiraphRunner.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatGiraphRunner.java
new file mode 100644
index 0000000..1bd0235
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatGiraphRunner.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hive Giraph Runner
+ */
+public class HCatGiraphRunner implements Tool {
+ /**
+ * logger
+ */
+ private static final Logger LOG = Logger.getLogger(HCatGiraphRunner.class);
+ /**
+ * workers
+ */
+ protected int workers;
+ /**
+ * is verbose
+ */
+ protected boolean isVerbose;
+ /**
+ * output table partitions
+ */
+ protected Map<String, String> outputTablePartitionValues;
+ /**
+ * dbName
+ */
+ protected String dbName;
+ /**
+ * vertex input table name
+ */
+ protected String vertexInputTableName;
+ /**
+ * vertex input table filter
+ */
+ protected String vertexInputTableFilterExpr;
+ /**
+ * edge input table name
+ */
+ protected String edgeInputTableName;
+ /**
+ * edge input table filter
+ */
+ protected String edgeInputTableFilterExpr;
+ /**
+ * output table name
+ */
+ protected String outputTableName;
+ /** Configuration */
+ private Configuration conf;
+ /** Skip output? (Useful for testing without writing) */
+ private boolean skipOutput = false;
+
+ /**
+ * vertex class.
+ */
+ private Class<? extends Vertex> vertexClass;
+ /**
+ * vertex input format internal.
+ */
+ private Class<? extends VertexInputFormat> vertexInputFormatClass;
+ /**
+ * edge input format internal.
+ */
+ private Class<? extends EdgeInputFormat> edgeInputFormatClass;
+ /**
+ * vertex output format internal.
+ */
+ private Class<? extends VertexOutputFormat> vertexOutputFormatClass;
+
+ /**
+ * Giraph runner class.
+ *
+ * @param vertexClass Vertex class
+ * @param vertexInputFormatClass Vertex input format
+ * @param edgeInputFormatClass Edge input format
+ * @param vertexOutputFormatClass Output format
+ */
+ protected HCatGiraphRunner(
+ Class<? extends Vertex> vertexClass,
+ Class<? extends VertexInputFormat> vertexInputFormatClass,
+ Class<? extends EdgeInputFormat> edgeInputFormatClass,
+ Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
+ this.vertexClass = vertexClass;
+ this.vertexInputFormatClass = vertexInputFormatClass;
+ this.edgeInputFormatClass = edgeInputFormatClass;
+ this.vertexOutputFormatClass = vertexOutputFormatClass;
+ this.conf = new HiveConf(getClass());
+ }
+
+ /**
+ * main method
+ * @param args system arguments
+ * @throws Exception any errors from Hive Giraph Runner
+ */
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(
+ new HCatGiraphRunner(null, null, null, null), args));
+ }
+
+ @Override
+ public final int run(String[] args) throws Exception {
+ // process args
+ try {
+ processArguments(args);
+ } catch (InterruptedException e) {
+ return 0;
+ } catch (IllegalArgumentException e) {
+ System.err.println(e.getMessage());
+ return -1;
+ }
+
+ // additional configuration for Hive
+ adjustConfigurationForHive(getConf());
+
+ // setup GiraphJob
+ GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ job.getConfiguration().setVertexClass(vertexClass);
+
+ // setup input from Hive
+ if (vertexInputFormatClass != null) {
+ InputJobInfo vertexInputJobInfo = InputJobInfo.create(dbName,
+ vertexInputTableName, vertexInputTableFilterExpr);
+ GiraphHCatInputFormat.setVertexInput(job.getInternalJob(),
+ vertexInputJobInfo);
+ job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
+ }
+ if (edgeInputFormatClass != null) {
+ InputJobInfo edgeInputJobInfo = InputJobInfo.create(dbName,
+ edgeInputTableName, edgeInputTableFilterExpr);
+ GiraphHCatInputFormat.setEdgeInput(job.getInternalJob(),
+ edgeInputJobInfo);
+ job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
+ }
+
+ // setup output to Hive
+ HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
+ dbName, outputTableName, outputTablePartitionValues));
+ HCatOutputFormat.setSchema(job.getInternalJob(),
+ HCatOutputFormat.getTableSchema(job.getInternalJob()));
+ if (skipOutput) {
+ LOG.warn("run: Warning - Output will be skipped!");
+ } else {
+ job.getConfiguration().setVertexOutputFormatClass(
+ vertexOutputFormatClass);
+ }
+
+ job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
+ initGiraphJob(job);
+
+ return job.run(isVerbose) ? 0 : -1;
+ }
+
+ /**
+ * set hive configuration
+ * @param conf Configuration argument
+ */
+ private static void adjustConfigurationForHive(Configuration conf) {
+ // when output partitions are used, workers register them to the
+ // metastore at cleanup stage, and on HiveConf's initialization, it
+ // looks for hive-site.xml from.
+ addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
+ .getResource("hive-site.xml").toString());
+
+ // Also, you need hive.aux.jars as well
+ // addToStringCollection(conf, "tmpjars",
+ // conf.getStringCollection("hive.aux.jars.path"));
+
+ // Or, more effectively, we can provide all the jars client needed to
+ // the workers as well
+ String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
+ File.pathSeparator);
+ List<String> hadoopJarURLs = Lists.newArrayList();
+ for (String jarPath : hadoopJars) {
+ File file = new File(jarPath);
+ if (file.exists() && file.isFile()) {
+ String jarURL = file.toURI().toString();
+ hadoopJarURLs.add(jarURL);
+ }
+ }
+ addToStringCollection(conf, "tmpjars", hadoopJarURLs);
+ }
+
+ /**
+ * process arguments
+ * @param args to process
+ * @return CommandLine instance
+ * @throws ParseException error parsing arguments
+ * @throws InterruptedException interrupted
+ */
+ private CommandLine processArguments(String[] args) throws ParseException,
+ InterruptedException {
+ Options options = new Options();
+ options.addOption("h", "help", false, "Help");
+ options.addOption("v", "verbose", false, "Verbose");
+ options.addOption("D", "hiveconf", true,
+ "property=value for Hive/Hadoop configuration");
+ options.addOption("w", "workers", true, "Number of workers");
+ if (vertexClass == null) {
+ options.addOption(null, "vertexClass", true,
+ "Giraph Vertex class to use");
+ }
+ if (vertexInputFormatClass == null) {
+ options.addOption(null, "vertexInputFormatClass", true,
+ "Giraph HCatalogVertexInputFormat class to use");
+ }
+ if (edgeInputFormatClass == null) {
+ options.addOption(null, "edgeInputFormatClass", true,
+ "Giraph HCatalogEdgeInputFormat class to use");
+ }
+
+ if (vertexOutputFormatClass == null) {
+ options.addOption(null, "vertexOutputFormatClass", true,
+ "Giraph HCatalogVertexOutputFormat class to use");
+ }
+
+ options.addOption("db", "dbName", true, "Hive database name");
+ options.addOption("vi", "vertexInputTable", true,
+ "Vertex input table name");
+ options.addOption("VI", "vertexInputFilter", true,
+ "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
+ options.addOption("ei", "edgeInputTable", true,
+ "Edge input table name");
+ options.addOption("EI", "edgeInputFilter", true,
+ "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
+ options.addOption("o", "outputTable", true, "Output table name");
+ options.addOption("O", "outputPartition", true,
+ "Output table partition values (e.g., \"a=1,b=two\")");
+ options.addOption("s", "skipOutput", false, "Skip output?");
+
+ addMoreOptions(options);
+
+ CommandLineParser parser = new GnuParser();
+ final CommandLine cmdln = parser.parse(options, args);
+ if (args.length == 0 || cmdln.hasOption("help")) {
+ new HelpFormatter().printHelp(getClass().getName(), options, true);
+ throw new InterruptedException();
+ }
+
+ // Giraph classes
+ if (cmdln.hasOption("vertexClass")) {
+ vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
+ Vertex.class);
+ }
+ if (cmdln.hasOption("vertexInputFormatClass")) {
+ vertexInputFormatClass = findClass(
+ cmdln.getOptionValue("vertexInputFormatClass"),
+ HCatalogVertexInputFormat.class);
+ }
+ if (cmdln.hasOption("edgeInputFormatClass")) {
+ edgeInputFormatClass = findClass(
+ cmdln.getOptionValue("edgeInputFormatClass"),
+ HCatalogEdgeInputFormat.class);
+ }
+
+ if (cmdln.hasOption("vertexOutputFormatClass")) {
+ vertexOutputFormatClass = findClass(
+ cmdln.getOptionValue("vertexOutputFormatClass"),
+ HCatalogVertexOutputFormat.class);
+ }
+
+ if (cmdln.hasOption("skipOutput")) {
+ skipOutput = true;
+ }
+
+ if (vertexClass == null) {
+ throw new IllegalArgumentException(
+ "Need the Giraph Vertex class name (-vertexClass) to use");
+ }
+ if (vertexInputFormatClass == null && edgeInputFormatClass == null) {
+ throw new IllegalArgumentException(
+ "Need at least one of Giraph VertexInputFormat " +
+ "class name (-vertexInputFormatClass) and " +
+ "EdgeInputFormat class name (-edgeInputFormatClass)");
+ }
+ if (vertexOutputFormatClass == null) {
+ throw new IllegalArgumentException(
+ "Need the Giraph VertexOutputFormat " +
+ "class name (-vertexOutputFormatClass) to use");
+ }
+ if (!cmdln.hasOption("workers")) {
+ throw new IllegalArgumentException(
+ "Need to choose the number of workers (-w)");
+ }
+ if (!cmdln.hasOption("vertexInputTable") &&
+ vertexInputFormatClass != null) {
+ throw new IllegalArgumentException(
+ "Need to set the vertex input table name (-vi)");
+ }
+ if (!cmdln.hasOption("edgeInputTable") &&
+ edgeInputFormatClass != null) {
+ throw new IllegalArgumentException(
+ "Need to set the edge input table name (-ei)");
+ }
+ if (!cmdln.hasOption("outputTable")) {
+ throw new IllegalArgumentException(
+ "Need to set the output table name (-o)");
+ }
+ dbName = cmdln.getOptionValue("dbName", "default");
+ vertexInputTableName = cmdln.getOptionValue("vertexInputTable");
+ vertexInputTableFilterExpr = cmdln.getOptionValue("vertexInputFilter");
+ edgeInputTableName = cmdln.getOptionValue("edgeInputTable");
+ edgeInputTableFilterExpr = cmdln.getOptionValue("edgeInputFilter");
+ outputTableName = cmdln.getOptionValue("outputTable");
+ outputTablePartitionValues = HiveUtils.parsePartitionValues(cmdln
+ .getOptionValue("outputPartition"));
+ workers = Integer.parseInt(cmdln.getOptionValue("workers"));
+ isVerbose = cmdln.hasOption("verbose");
+
+ // pick up -hiveconf arguments
+ for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
+ String[] keyval = hiveconf.split("=", 2);
+ if (keyval.length == 2) {
+ String name = keyval[0];
+ String value = keyval[1];
+ if (name.equals("tmpjars") || name.equals("tmpfiles")) {
+ addToStringCollection(
+ conf, name, value);
+ } else {
+ conf.set(name, value);
+ }
+ }
+ }
+
+ processMoreArguments(cmdln);
+
+ return cmdln;
+ }
+
+ /**
+ * add string to collection
+ * @param conf Configuration
+ * @param name name to add
+ * @param values values for collection
+ */
+ private static void addToStringCollection(Configuration conf, String name,
+ String... values) {
+ addToStringCollection(conf, name, Arrays.asList(values));
+ }
+
+ /**
+ * add string to collection
+ * @param conf Configuration
+ * @param name to add
+ * @param values values for collection
+ */
+ private static void addToStringCollection(
+ Configuration conf, String name, Collection
+ <? extends String> values) {
+ Collection<String> tmpfiles = conf.getStringCollection(name);
+ tmpfiles.addAll(values);
+ conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
+ }
+
+ /**
+ *
+ * @param className to find
+ * @param base base class
+ * @param <T> class type found
+ * @return type found
+ */
+ private <T> Class<? extends T> findClass(String className, Class<T> base) {
+ try {
+ Class<?> cls = Class.forName(className);
+ if (base.isAssignableFrom(cls)) {
+ return cls.asSubclass(base);
+ }
+ return null;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(className + ": Invalid class name");
+ }
+ }
+
+ @Override
+ public final Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public final void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Override this method to add more command-line options. You can process
+ * them by also overriding {@link #processMoreArguments(CommandLine)}.
+ *
+ * @param options Options
+ */
+ protected void addMoreOptions(Options options) {
+ }
+
+ /**
+ * Override this method to process additional command-line arguments. You
+ * may want to declare additional options by also overriding
+ * {@link #addMoreOptions(Options)}.
+ *
+ * @param cmd Command
+ */
+ protected void processMoreArguments(CommandLine cmd) {
+ }
+
+ /**
+ * Override this method to do additional setup with the GiraphJob that will
+ * run.
+ *
+ * @param job
+ * GiraphJob that is going to run
+ */
+ protected void initGiraphJob(GiraphJob job) {
+ LOG.info(getClass().getSimpleName() + " with");
+ String prefix = "\t";
+ LOG.info(prefix + "-vertexClass=" +
+ vertexClass.getCanonicalName());
+ if (vertexInputFormatClass != null) {
+ LOG.info(prefix + "-vertexInputFormatClass=" +
+ vertexInputFormatClass.getCanonicalName());
+ }
+ if (edgeInputFormatClass != null) {
+ LOG.info(prefix + "-edgeInputFormatClass=" +
+ edgeInputFormatClass.getCanonicalName());
+ }
+ LOG.info(prefix + "-vertexOutputFormatClass=" +
+ vertexOutputFormatClass.getCanonicalName());
+ if (vertexInputTableName != null) {
+ LOG.info(prefix + "-vertexInputTable=" + vertexInputTableName);
+ }
+ if (vertexInputTableFilterExpr != null) {
+ LOG.info(prefix + "-vertexInputFilter=\"" +
+ vertexInputTableFilterExpr + "\"");
+ }
+ if (edgeInputTableName != null) {
+ LOG.info(prefix + "-edgeInputTable=" + edgeInputTableName);
+ }
+ if (edgeInputTableFilterExpr != null) {
+ LOG.info(prefix + "-edgeInputFilter=\"" +
+ edgeInputTableFilterExpr + "\"");
+ }
+ LOG.info(prefix + "-outputTable=" + outputTableName);
+ if (outputTablePartitionValues != null) {
+ LOG.info(prefix + "-outputPartition=\"" +
+ outputTablePartitionValues + "\"");
+ }
+ LOG.info(prefix + "-workers=" + workers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
deleted file mode 100644
index 313bab0..0000000
--- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Hive Giraph Runner
- */
-public class HiveGiraphRunner implements Tool {
- /**
- * logger
- */
- private static final Logger LOG = Logger.getLogger(HiveGiraphRunner.class);
- /**
- * workers
- */
- protected int workers;
- /**
- * is verbose
- */
- protected boolean isVerbose;
- /**
- * output table partitions
- */
- protected Map<String, String> outputTablePartitionValues;
- /**
- * dbName
- */
- protected String dbName;
- /**
- * vertex input table name
- */
- protected String vertexInputTableName;
- /**
- * vertex input table filter
- */
- protected String vertexInputTableFilterExpr;
- /**
- * edge input table name
- */
- protected String edgeInputTableName;
- /**
- * edge input table filter
- */
- protected String edgeInputTableFilterExpr;
- /**
- * output table name
- */
- protected String outputTableName;
- /** Configuration */
- private Configuration conf;
- /** Skip output? (Useful for testing without writing) */
- private boolean skipOutput = false;
-
- /**
- * vertex class.
- */
- private Class<? extends Vertex> vertexClass;
- /**
- * vertex input format internal.
- */
- private Class<? extends VertexInputFormat> vertexInputFormatClass;
- /**
- * edge input format internal.
- */
- private Class<? extends EdgeInputFormat> edgeInputFormatClass;
- /**
- * vertex output format internal.
- */
- private Class<? extends VertexOutputFormat> vertexOutputFormatClass;
-
- /**
- * Giraph runner class.
- *
- * @param vertexClass Vertex class
- * @param vertexInputFormatClass Vertex input format
- * @param edgeInputFormatClass Edge input format
- * @param vertexOutputFormatClass Output format
- */
- protected HiveGiraphRunner(
- Class<? extends Vertex> vertexClass,
- Class<? extends VertexInputFormat> vertexInputFormatClass,
- Class<? extends EdgeInputFormat> edgeInputFormatClass,
- Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
- this.vertexClass = vertexClass;
- this.vertexInputFormatClass = vertexInputFormatClass;
- this.edgeInputFormatClass = edgeInputFormatClass;
- this.vertexOutputFormatClass = vertexOutputFormatClass;
- this.conf = new HiveConf(getClass());
- }
-
- /**
- * main method
- * @param args system arguments
- * @throws Exception any errors from Hive Giraph Runner
- */
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(
- new HiveGiraphRunner(null, null, null, null), args));
- }
-
- @Override
- public final int run(String[] args) throws Exception {
- // process args
- try {
- processArguments(args);
- } catch (InterruptedException e) {
- return 0;
- } catch (IllegalArgumentException e) {
- System.err.println(e.getMessage());
- return -1;
- }
-
- // additional configuration for Hive
- adjustConfigurationForHive(getConf());
-
- // setup GiraphJob
- GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- job.getConfiguration().setVertexClass(vertexClass);
-
- // setup input from Hive
- if (vertexInputFormatClass != null) {
- InputJobInfo vertexInputJobInfo = InputJobInfo.create(dbName,
- vertexInputTableName, vertexInputTableFilterExpr);
- GiraphHCatInputFormat.setVertexInput(job.getInternalJob(),
- vertexInputJobInfo);
- job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
- }
- if (edgeInputFormatClass != null) {
- InputJobInfo edgeInputJobInfo = InputJobInfo.create(dbName,
- edgeInputTableName, edgeInputTableFilterExpr);
- GiraphHCatInputFormat.setEdgeInput(job.getInternalJob(),
- edgeInputJobInfo);
- job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
- }
-
- // setup output to Hive
- HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
- dbName, outputTableName, outputTablePartitionValues));
- HCatOutputFormat.setSchema(job.getInternalJob(),
- HCatOutputFormat.getTableSchema(job.getInternalJob()));
- if (skipOutput) {
- LOG.warn("run: Warning - Output will be skipped!");
- } else {
- job.getConfiguration().setVertexOutputFormatClass(
- vertexOutputFormatClass);
- }
-
- job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
- initGiraphJob(job);
-
- return job.run(isVerbose) ? 0 : -1;
- }
-
- /**
- * set hive configuration
- * @param conf Configuration argument
- */
- private static void adjustConfigurationForHive(Configuration conf) {
- // when output partitions are used, workers register them to the
- // metastore at cleanup stage, and on HiveConf's initialization, it
- // looks for hive-site.xml from.
- addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
- .getResource("hive-site.xml").toString());
-
- // Also, you need hive.aux.jars as well
- // addToStringCollection(conf, "tmpjars",
- // conf.getStringCollection("hive.aux.jars.path"));
-
- // Or, more effectively, we can provide all the jars client needed to
- // the workers as well
- String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
- File.pathSeparator);
- List<String> hadoopJarURLs = Lists.newArrayList();
- for (String jarPath : hadoopJars) {
- File file = new File(jarPath);
- if (file.exists() && file.isFile()) {
- String jarURL = file.toURI().toString();
- hadoopJarURLs.add(jarURL);
- }
- }
- addToStringCollection(conf, "tmpjars", hadoopJarURLs);
- }
-
- /**
- * process arguments
- * @param args to process
- * @return CommandLine instance
- * @throws ParseException error parsing arguments
- * @throws InterruptedException interrupted
- */
- private CommandLine processArguments(String[] args) throws ParseException,
- InterruptedException {
- Options options = new Options();
- options.addOption("h", "help", false, "Help");
- options.addOption("v", "verbose", false, "Verbose");
- options.addOption("D", "hiveconf", true,
- "property=value for Hive/Hadoop configuration");
- options.addOption("w", "workers", true, "Number of workers");
- if (vertexClass == null) {
- options.addOption(null, "vertexClass", true,
- "Giraph Vertex class to use");
- }
- if (vertexInputFormatClass == null) {
- options.addOption(null, "vertexInputFormatClass", true,
- "Giraph HCatalogVertexInputFormat class to use");
- }
- if (edgeInputFormatClass == null) {
- options.addOption(null, "edgeInputFormatClass", true,
- "Giraph HCatalogEdgeInputFormat class to use");
- }
-
- if (vertexOutputFormatClass == null) {
- options.addOption(null, "vertexOutputFormatClass", true,
- "Giraph HCatalogVertexOutputFormat class to use");
- }
-
- options.addOption("db", "dbName", true, "Hive database name");
- options.addOption("vi", "vertexInputTable", true,
- "Vertex input table name");
- options.addOption("VI", "vertexInputFilter", true,
- "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
- options.addOption("ei", "edgeInputTable", true,
- "Edge input table name");
- options.addOption("EI", "edgeInputFilter", true,
- "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
- options.addOption("o", "outputTable", true, "Output table name");
- options.addOption("O", "outputPartition", true,
- "Output table partition values (e.g., \"a=1,b=two\")");
- options.addOption("s", "skipOutput", false, "Skip output?");
-
- addMoreOptions(options);
-
- CommandLineParser parser = new GnuParser();
- final CommandLine cmdln = parser.parse(options, args);
- if (args.length == 0 || cmdln.hasOption("help")) {
- new HelpFormatter().printHelp(getClass().getName(), options, true);
- throw new InterruptedException();
- }
-
- // Giraph classes
- if (cmdln.hasOption("vertexClass")) {
- vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
- Vertex.class);
- }
- if (cmdln.hasOption("vertexInputFormatClass")) {
- vertexInputFormatClass = findClass(
- cmdln.getOptionValue("vertexInputFormatClass"),
- HCatalogVertexInputFormat.class);
- }
- if (cmdln.hasOption("edgeInputFormatClass")) {
- edgeInputFormatClass = findClass(
- cmdln.getOptionValue("edgeInputFormatClass"),
- HCatalogEdgeInputFormat.class);
- }
-
- if (cmdln.hasOption("vertexOutputFormatClass")) {
- vertexOutputFormatClass = findClass(
- cmdln.getOptionValue("vertexOutputFormatClass"),
- HCatalogVertexOutputFormat.class);
- }
-
- if (cmdln.hasOption("skipOutput")) {
- skipOutput = true;
- }
-
- if (vertexClass == null) {
- throw new IllegalArgumentException(
- "Need the Giraph Vertex class name (-vertexClass) to use");
- }
- if (vertexInputFormatClass == null && edgeInputFormatClass == null) {
- throw new IllegalArgumentException(
- "Need at least one of Giraph VertexInputFormat " +
- "class name (-vertexInputFormatClass) and " +
- "EdgeInputFormat class name (-edgeInputFormatClass)");
- }
- if (vertexOutputFormatClass == null) {
- throw new IllegalArgumentException(
- "Need the Giraph VertexOutputFormat " +
- "class name (-vertexOutputFormatClass) to use");
- }
- if (!cmdln.hasOption("workers")) {
- throw new IllegalArgumentException(
- "Need to choose the number of workers (-w)");
- }
- if (!cmdln.hasOption("vertexInputTable") &&
- vertexInputFormatClass != null) {
- throw new IllegalArgumentException(
- "Need to set the vertex input table name (-vi)");
- }
- if (!cmdln.hasOption("edgeInputTable") &&
- edgeInputFormatClass != null) {
- throw new IllegalArgumentException(
- "Need to set the edge input table name (-ei)");
- }
- if (!cmdln.hasOption("outputTable")) {
- throw new IllegalArgumentException(
- "Need to set the output table name (-o)");
- }
- dbName = cmdln.getOptionValue("dbName", "default");
- vertexInputTableName = cmdln.getOptionValue("vertexInputTable");
- vertexInputTableFilterExpr = cmdln.getOptionValue("vertexInputFilter");
- edgeInputTableName = cmdln.getOptionValue("edgeInputTable");
- edgeInputTableFilterExpr = cmdln.getOptionValue("edgeInputFilter");
- outputTableName = cmdln.getOptionValue("outputTable");
- outputTablePartitionValues = HiveUtils.parsePartitionValues(cmdln
- .getOptionValue("outputPartition"));
- workers = Integer.parseInt(cmdln.getOptionValue("workers"));
- isVerbose = cmdln.hasOption("verbose");
-
- // pick up -hiveconf arguments
- for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
- String[] keyval = hiveconf.split("=", 2);
- if (keyval.length == 2) {
- String name = keyval[0];
- String value = keyval[1];
- if (name.equals("tmpjars") || name.equals("tmpfiles")) {
- addToStringCollection(
- conf, name, value);
- } else {
- conf.set(name, value);
- }
- }
- }
-
- processMoreArguments(cmdln);
-
- return cmdln;
- }
-
- /**
- * add string to collection
- * @param conf Configuration
- * @param name name to add
- * @param values values for collection
- */
- private static void addToStringCollection(Configuration conf, String name,
- String... values) {
- addToStringCollection(conf, name, Arrays.asList(values));
- }
-
- /**
- * add string to collection
- * @param conf Configuration
- * @param name to add
- * @param values values for collection
- */
- private static void addToStringCollection(
- Configuration conf, String name, Collection
- <? extends String> values) {
- Collection<String> tmpfiles = conf.getStringCollection(name);
- tmpfiles.addAll(values);
- conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
- }
-
- /**
- *
- * @param className to find
- * @param base base class
- * @param <T> class type found
- * @return type found
- */
- private <T> Class<? extends T> findClass(String className, Class<T> base) {
- try {
- Class<?> cls = Class.forName(className);
- if (base.isAssignableFrom(cls)) {
- return cls.asSubclass(base);
- }
- return null;
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(className + ": Invalid class name");
- }
- }
-
- @Override
- public final Configuration getConf() {
- return conf;
- }
-
- @Override
- public final void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * Override this method to add more command-line options. You can process
- * them by also overriding {@link #processMoreArguments(CommandLine)}.
- *
- * @param options Options
- */
- protected void addMoreOptions(Options options) {
- }
-
- /**
- * Override this method to process additional command-line arguments. You
- * may want to declare additional options by also overriding
- * {@link #addMoreOptions(Options)}.
- *
- * @param cmd Command
- */
- protected void processMoreArguments(CommandLine cmd) {
- }
-
- /**
- * Override this method to do additional setup with the GiraphJob that will
- * run.
- *
- * @param job
- * GiraphJob that is going to run
- */
- protected void initGiraphJob(GiraphJob job) {
- LOG.info(getClass().getSimpleName() + " with");
- String prefix = "\t";
- LOG.info(prefix + "-vertexClass=" +
- vertexClass.getCanonicalName());
- if (vertexInputFormatClass != null) {
- LOG.info(prefix + "-vertexInputFormatClass=" +
- vertexInputFormatClass.getCanonicalName());
- }
- if (edgeInputFormatClass != null) {
- LOG.info(prefix + "-edgeInputFormatClass=" +
- edgeInputFormatClass.getCanonicalName());
- }
- LOG.info(prefix + "-vertexOutputFormatClass=" +
- vertexOutputFormatClass.getCanonicalName());
- if (vertexInputTableName != null) {
- LOG.info(prefix + "-vertexInputTable=" + vertexInputTableName);
- }
- if (vertexInputTableFilterExpr != null) {
- LOG.info(prefix + "-vertexInputFilter=\"" +
- vertexInputTableFilterExpr + "\"");
- }
- if (edgeInputTableName != null) {
- LOG.info(prefix + "-edgeInputTable=" + edgeInputTableName);
- }
- if (edgeInputTableFilterExpr != null) {
- LOG.info(prefix + "-edgeInputFilter=\"" +
- edgeInputTableFilterExpr + "\"");
- }
- LOG.info(prefix + "-outputTable=" + outputTableName);
- if (outputTablePartitionValues != null) {
- LOG.info(prefix + "-outputPartition=\"" +
- outputTablePartitionValues + "\"");
- }
- LOG.info(prefix + "-workers=" + workers);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
index c1f76f1..fedd92c 100644
--- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
@@ -43,7 +43,7 @@ public class HiveUtils {
* @return Map
*/
public static Map<String, String> parsePartitionValues(
- String outputTablePartitionString) {
+ String outputTablePartitionString) {
if (outputTablePartitionString == null) {
return null;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-hive/pom.xml b/giraph-hive/pom.xml
new file mode 100644
index 0000000..84b7c91
--- /dev/null
+++ b/giraph-hive/pom.xml
@@ -0,0 +1,137 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-parent</artifactId>
+ <version>0.2-SNAPSHOT</version>
+ </parent>
+ <artifactId>giraph-hive</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Giraph Hive I/O</name>
+
+ <properties>
+ <top.dir>${project.basedir}/..</top.dir>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.9</version>
+ <configuration>
+ <configLocation>${top.dir}/checkstyle.xml</configLocation>
+ <headerLocation>${top.dir}/license-header.txt</headerLocation>
+ <enableRulesSummary>false</enableRulesSummary>
+ <failOnError>true</failOnError>
+ <includeTestSourceDirectory>false</includeTestSourceDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>prop.jarLocation</name>
+ <value>${top.dir}/giraph-core/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.5.1</version>
+ <configuration>
+ <xmlOutput>true</xmlOutput>
+ <findbugsXmlOutput>false</findbugsXmlOutput>
+ <excludeFilterFile>${top.dir}/findbugs-exclude.xml</excludeFilterFile>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- compile dependencies. sorted lexicographically. -->
+ <dependency>
+ <groupId>com.facebook.giraph.hive</groupId>
+ <artifactId>hive-io-experimental</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.spullara.cli-parser</groupId>
+ <artifactId>cli-parser</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ </dependency>
+
+ <!-- test dependencies. sorted lexicographically. -->
+ <dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/assembly/compile.xml
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/assembly/compile.xml b/giraph-hive/src/main/assembly/compile.xml
new file mode 100644
index 0000000..fcaffa6
--- /dev/null
+++ b/giraph-hive/src/main/assembly/compile.xml
@@ -0,0 +1,39 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <useProjectArtifact>true</useProjectArtifact>
+ <outputDirectory>/</outputDirectory>
+ <unpackOptions>
+ <excludes>
+ <exclude>META-INF/LICENSE</exclude>
+ </excludes>
+ </unpackOptions>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
new file mode 100644
index 0000000..cef96b5
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
+import org.apache.giraph.hive.input.edge.HiveEdgeReader;
+import org.apache.giraph.hive.input.edge.HiveToEdge;
+import org.apache.giraph.hive.input.vertex.HiveToVertex;
+import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
+import org.apache.giraph.hive.input.vertex.HiveVertexReader;
+import org.apache.giraph.hive.output.HiveVertexOutputFormat;
+import org.apache.giraph.hive.output.HiveVertexWriter;
+import org.apache.giraph.hive.output.VertexToHive;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+import com.facebook.giraph.hive.input.HiveApiInputFormat;
+import com.facebook.giraph.hive.input.HiveInputDescription;
+import com.facebook.giraph.hive.output.HiveApiOutputFormat;
+import com.facebook.giraph.hive.output.HiveOutputDescription;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hive Giraph Runner
+ */
+public class HiveGiraphRunner implements Tool {
+ /** logger */
+ private static final Logger LOG = Logger.getLogger(HiveGiraphRunner.class);
+ /** Prefix for log statements */
+ private static final String LOG_PREFIX = "\t";
+
+ /** workers */
+ protected int workers;
+ /** is verbose */
+ protected boolean isVerbose;
+
+ /** vertex class. */
+ private Class<? extends Vertex> vertexClass;
+
+ /** Vertex creator from hive records. */
+ private Class<? extends HiveToVertex> hiveToVertexClass;
+ /** hive vertex input information */
+ private final HiveInputDescription hiveVertexInputDescription;
+
+ /** Edge creator from hive records. */
+ private Class<? extends HiveToEdge> hiveToEdgeClass;
+ /** hive edge input information */
+ private final HiveInputDescription hiveEdgeInputDescription;
+
+ /** Hive Vertex writer */
+ private Class<? extends VertexToHive> vertexToHiveClass;
+ /** hive output information */
+ private final HiveOutputDescription hiveOutputDescription;
+ /** Skip output? (Useful for testing without writing) */
+ private boolean skipOutput = false;
+
+ /** Configuration */
+ private Configuration conf;
+
+ /** Create a new runner */
+ public HiveGiraphRunner() {
+ conf = new HiveConf(getClass());
+ hiveVertexInputDescription = new HiveInputDescription();
+ hiveEdgeInputDescription = new HiveInputDescription();
+ hiveOutputDescription = new HiveOutputDescription();
+ }
+
+ /**
+ * Get Vertex class used.
+ *
+ * @return Vertex class
+ */
+ public Class<? extends Vertex> getVertexClass() {
+ return vertexClass;
+ }
+
+ /**
+ * Set Vertex class to use
+ *
+ * @param vertexClass Vertex class
+ */
+ public void setVertexClass(Class<? extends Vertex> vertexClass) {
+ this.vertexClass = vertexClass;
+ }
+
+ /**
+ * Get hive vertex input description
+ *
+ * @return HiveInputDescription
+ */
+ public HiveInputDescription getHiveVertexInputDescription() {
+ return hiveVertexInputDescription;
+ }
+
+ /**
+ * Get hive output description
+ *
+ * @return HiveOutputDescription
+ */
+ public HiveOutputDescription getHiveOutputDescription() {
+ return hiveOutputDescription;
+ }
+
+ /**
+ * Get hive input description
+ *
+ * @return HiveInputDescription
+ */
+ public HiveInputDescription getHiveEdgeInputDescription() {
+ return hiveEdgeInputDescription;
+ }
+
+ /**
+ * Get HiveVertexCreator used with HiveVertexInputFormat
+ *
+ * @return HiveVertexCreator
+ */
+ public Class<? extends HiveToVertex> getHiveToVertexClass() {
+ return hiveToVertexClass;
+ }
+
+ /**
+ * Whether to use vertex input.
+ *
+ * @return true if vertex input enabled (HiveVertexCreator is set).
+ */
+ public boolean hasVertexInput() {
+ return hiveToVertexClass != null;
+ }
+
+ /**
+ * Set class used to convert hive records to Vertexes.
+ *
+ * @param hiveToVertexValueClass HiveToVertex class
+ */
+ public void setHiveToVertexValueClass(
+ Class<? extends HiveToVertex> hiveToVertexValueClass) {
+ setHiveToVertexClass(hiveToVertexValueClass);
+ }
+
+ /**
+ * Set HiveVertexCreator used with HiveVertexInputFormat
+ *
+ * @param hiveToVertexClass HiveVertexCreator
+ */
+ public void setHiveToVertexClass(
+ Class<? extends HiveToVertex> hiveToVertexClass) {
+ this.hiveToVertexClass = hiveToVertexClass;
+ conf.setClass(HiveVertexReader.HIVE_TO_VERTEX_KEY, hiveToVertexClass,
+ HiveToVertex.class);
+ }
+
+ /**
+ * Get HiveEdgeCreator used with HiveEdgeInputFormat
+ *
+ * @return HiveEdgeCreator
+ */
+ public Class<? extends HiveToEdge> getHiveToEdgeClass() {
+ return hiveToEdgeClass;
+ }
+
+ /**
+ * Whether to use edge input.
+ *
+ * @return true if edge input enabled (HiveEdgeCreator is set).
+ */
+ public boolean hasEdgeInput() {
+ return hiveToEdgeClass != null;
+ }
+
+ /**
+ * Set HiveEdgeCreator used with HiveEdgeInputFormat
+ *
+ * @param hiveToEdgeClass HiveEdgeCreator
+ */
+ public void setHiveToEdgeClass(Class<? extends HiveToEdge> hiveToEdgeClass) {
+ this.hiveToEdgeClass = hiveToEdgeClass;
+ conf.setClass(HiveEdgeReader.EDGE_CREATOR_KEY, hiveToEdgeClass,
+ HiveToEdge.class);
+ }
+
+ /**
+ * Get class used to write vertices to Hive.
+ *
+ * @return class for writing vertices to Hive
+ */
+ public Class<? extends VertexToHive> getVertexToHiveClass() {
+ return vertexToHiveClass;
+ }
+
+ /**
+ * Whether we are writing vertices out.
+ *
+ * @return true if vertex output enabled
+ */
+ public boolean hasVertexOutput() {
+ return !skipOutput && vertexToHiveClass != null;
+ }
+
+ /**
+ * Set class used to write vertices to Hive.
+ *
+ * @param vertexToHiveClass class for writing vertices to Hive.
+ */
+ public void setVertexToHiveClass(
+ Class<? extends VertexToHive> vertexToHiveClass) {
+ this.vertexToHiveClass = vertexToHiveClass;
+ conf.setClass(HiveVertexWriter.VERTEX_TO_HIVE_KEY, vertexToHiveClass,
+ VertexToHive.class);
+ }
+
+ /**
+ * main method
+ * @param args system arguments
+ * @throws Exception any errors from Hive Giraph Runner
+ */
+ public static void main(String[] args) throws Exception {
+ HiveGiraphRunner runner = new HiveGiraphRunner();
+ System.exit(ToolRunner.run(runner, args));
+ }
+
+ @Override
+ public final int run(String[] args) throws Exception {
+ // process args
+ try {
+ handleCommandLine(args);
+ } catch (InterruptedException e) {
+ return 0;
+ } catch (IllegalArgumentException e) {
+ System.err.println(e.getMessage());
+ return -1;
+ }
+
+ // additional configuration for Hive
+ adjustConfigurationForHive();
+
+ // setup GiraphJob
+ GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ GiraphConfiguration giraphConf = job.getConfiguration();
+ giraphConf.setVertexClass(vertexClass);
+
+ setupHiveInputs(giraphConf);
+ setupHiveOutput(giraphConf);
+
+ giraphConf.setWorkerConfiguration(workers, workers, 100.0f);
+ initGiraphJob(job);
+
+ logOptions(giraphConf);
+
+ return job.run(isVerbose) ? 0 : -1;
+ }
+
+ /**
+ * Initialize hive input settings
+ *
+ * @param conf Configuration to write to
+ * @throws TException thrift problem
+ */
+ private void setupHiveInputs(GiraphConfiguration conf) throws TException {
+ if (hiveToVertexClass != null) {
+ HiveApiInputFormat.initProfile(conf, hiveVertexInputDescription,
+ HiveProfiles.VERTEX_INPUT_PROFILE_ID);
+ conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
+ }
+
+ if (hiveToEdgeClass != null) {
+ HiveApiInputFormat.initProfile(conf, hiveEdgeInputDescription,
+ HiveProfiles.EDGE_INPUT_PROFILE_ID);
+ conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
+ }
+ }
+
+ /**
+ * Initialize hive output settings
+ *
+ * @param conf Configuration to write to
+ * @throws TException thrift problem
+ */
+ private void setupHiveOutput(GiraphConfiguration conf) throws TException {
+ if (skipOutput) {
+ LOG.warn("run: Warning - Output will be skipped!");
+ } else if (vertexToHiveClass != null) {
+ HiveApiOutputFormat.initProfile(conf, hiveOutputDescription,
+ HiveProfiles.VERTEX_OUTPUT_PROFILE_ID);
+ conf.setVertexOutputFormatClass(HiveVertexOutputFormat.class);
+ } else {
+ LOG.fatal("output requested but " + VertexToHive.class.getSimpleName() +
+ " not set");
+ }
+ }
+
+ /**
+ * set hive configuration
+ */
+ private void adjustConfigurationForHive() {
+ // when output partitions are used, workers register them to the
+ // metastore at cleanup stage, and on HiveConf's initialization, it
+ // looks for hive-site.xml.
+ addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
+ .getResource("hive-site.xml").toString());
+
+ // Or, more effectively, we can provide all the jars client needed to
+ // the workers as well
+ String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
+ File.pathSeparator);
+ List<String> hadoopJarURLs = Lists.newArrayList();
+ for (String jarPath : hadoopJars) {
+ File file = new File(jarPath);
+ if (file.exists() && file.isFile()) {
+ String jarURL = file.toURI().toString();
+ hadoopJarURLs.add(jarURL);
+ }
+ }
+ addToStringCollection(conf, "tmpjars", hadoopJarURLs);
+ }
+
+ /**
+ * process arguments
+ * @param args to process
+ * @return CommandLine instance
+ * @throws org.apache.commons.cli.ParseException error parsing arguments
+ * @throws InterruptedException interrupted
+ */
+ private CommandLine handleCommandLine(String[] args) throws ParseException,
+ InterruptedException {
+ Options options = new Options();
+ addOptions(options);
+ addMoreOptions(options);
+
+ CommandLineParser parser = new GnuParser();
+ final CommandLine cmdln = parser.parse(options, args);
+ if (args.length == 0 || cmdln.hasOption("help")) {
+ new HelpFormatter().printHelp(getClass().getName(), options, true);
+ throw new InterruptedException();
+ }
+
+ // Giraph classes
+ String vertexClassStr = cmdln.getOptionValue("vertexClass");
+ if (vertexClassStr != null) {
+ vertexClass = findClass(vertexClassStr, Vertex.class);
+ }
+ if (vertexClass == null) {
+ throw new IllegalArgumentException(
+ "Need the Giraph " + Vertex.class.getSimpleName() +
+ " class name (-vertexClass) to use");
+ }
+
+ String hiveToVertexClassStr = cmdln.getOptionValue("hiveToVertexClass");
+ if (hiveToVertexClassStr != null) {
+ hiveToVertexClass = findClass(hiveToVertexClassStr, HiveToVertex.class);
+ }
+
+ String hiveToEdgeClassStr = cmdln.getOptionValue("hiveToEdgeClass");
+ if (hiveToEdgeClassStr != null) {
+ hiveToEdgeClass = findClass(hiveToEdgeClassStr, HiveToEdge.class);
+ }
+
+ String vertexToHiveClassStr = cmdln.getOptionValue("vertexToHiveClass");
+ if (vertexToHiveClassStr != null) {
+ vertexToHiveClass = findClass(vertexToHiveClassStr, VertexToHive.class);
+ }
+
+ if (cmdln.hasOption("skipOutput")) {
+ skipOutput = true;
+ }
+
+// if (hiveToVertexClass == null && hiveToEdgeClass == null) {
+// throw new IllegalArgumentException(
+// "Need at least one of Giraph " +
+// HiveToVertex.class.getSimpleName() +
+// " class name (-hiveToVertexClass) and " +
+// HiveToEdge.class.getSimpleName() +
+// " class name (-hiveToEdgeClass)");
+// }
+// if (vertexToHiveClass == null && !skipOutput) {
+// throw new IllegalArgumentException(
+// "Need the Giraph " + VertexToHive.class.getSimpleName() +
+// " class name (-vertexToHiveClass) to use");
+// }
+ String workersStr = cmdln.getOptionValue("workers");
+ if (workersStr == null) {
+ throw new IllegalArgumentException(
+ "Need to choose the number of workers (-w)");
+ }
+
+ String vertexInputTableStr = cmdln.getOptionValue("vertexInputTable");
+ if (vertexInputTableStr == null && hiveToVertexClass != null) {
+ throw new IllegalArgumentException(
+ "Need to set the vertex input table name (-vi)");
+ }
+
+ String edgeInputTableStr = cmdln.getOptionValue("edgeInputTable");
+ if (edgeInputTableStr == null && hiveToEdgeClass != null) {
+ throw new IllegalArgumentException(
+ "Need to set the edge input table name (-ei)");
+ }
+
+ String outputTableStr = cmdln.getOptionValue("outputTable");
+ if (outputTableStr == null) {
+ throw new IllegalArgumentException(
+ "Need to set the output table name (-o)");
+ }
+
+ String dbName = cmdln.getOptionValue("dbName", "default");
+ hiveVertexInputDescription.setDbName(dbName);
+ hiveEdgeInputDescription.setDbName(dbName);
+ hiveOutputDescription.setDbName(dbName);
+
+ hiveEdgeInputDescription.setPartitionFilter(
+ cmdln.getOptionValue("edgeInputFilter"));
+ hiveEdgeInputDescription.setTableName(edgeInputTableStr);
+
+ hiveVertexInputDescription.setPartitionFilter(
+ cmdln.getOptionValue("vertexInputFilter"));
+ hiveVertexInputDescription.setTableName(vertexInputTableStr);
+
+ hiveOutputDescription.setTableName(cmdln.getOptionValue("outputTable"));
+ hiveOutputDescription.setPartitionValues(
+ parsePartitionValues(cmdln.getOptionValue("outputPartition"))
+ );
+
+ workers = Integer.parseInt(workersStr);
+
+ isVerbose = cmdln.hasOption("verbose");
+
+ // pick up -hiveconf arguments
+ processHiveConfOptions(cmdln);
+
+ processMoreArguments(cmdln);
+
+ return cmdln;
+ }
+
+ /**
+ * Process -hiveconf options from command line
+ *
+ * @param cmdln Command line options
+ */
+ private void processHiveConfOptions(CommandLine cmdln) {
+ for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
+ String[] keyval = hiveconf.split("=", 2);
+ if (keyval.length == 2) {
+ String name = keyval[0];
+ String value = keyval[1];
+ if (name.equals("tmpjars") || name.equals("tmpfiles")) {
+ addToStringCollection(conf, name, value);
+ } else {
+ conf.set(name, value);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param outputTablePartitionString table partition string
+ * @return Map
+ */
+ public static Map<String, String> parsePartitionValues(
+ String outputTablePartitionString) {
+ if (outputTablePartitionString == null) {
+ return null;
+ }
+ Splitter commaSplitter = Splitter.on(',').omitEmptyStrings().trimResults();
+ Splitter equalSplitter = Splitter.on('=').omitEmptyStrings().trimResults();
+ Map<String, String> partitionValues = Maps.newHashMap();
+ for (String keyValStr : commaSplitter.split(outputTablePartitionString)) {
+ List<String> keyVal = Lists.newArrayList(equalSplitter.split(keyValStr));
+ if (keyVal.size() != 2) {
+ throw new IllegalArgumentException(
+ "Unrecognized partition value format: " +
+ outputTablePartitionString);
+ }
+ partitionValues.put(keyVal.get(0), keyVal.get(1));
+ }
+ return partitionValues;
+ }
+
+ /**
+ * Add hive-related options to command line parser options
+ *
+ * @param options Options to use
+ */
+ private void addOptions(Options options) {
+ options.addOption("h", "help", false, "Help");
+ options.addOption("v", "verbose", false, "Verbose");
+ options.addOption("D", "hiveconf", true,
+ "property=value for Hive/Hadoop configuration");
+ options.addOption("w", "workers", true, "Number of workers");
+
+ if (vertexClass == null) {
+ options.addOption(null, "vertexClass", true,
+ "Giraph Vertex class to use");
+ }
+
+ options.addOption("db", "dbName", true, "Hive database name");
+
+ // Vertex input settings
+ if (hiveToVertexClass == null) {
+ options.addOption(null, "hiveToVertexClass", true,
+ "Giraph " + HiveToVertex.class.getSimpleName() + " class to use");
+ }
+ options.addOption("vi", "vertexInputTable", true,
+ "Vertex input table name");
+ options.addOption("VI", "vertexInputFilter", true,
+ "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
+
+ // Edge input settings
+ if (hiveToEdgeClass == null) {
+ options.addOption(null, "hiveToEdgeClass", true,
+ "Giraph " + HiveToEdge.class.getSimpleName() + " class to use");
+ }
+ options.addOption("ei", "edgeInputTable", true,
+ "Edge input table name");
+ options.addOption("EI", "edgeInputFilter", true,
+ "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
+
+ // Vertex output settings
+ if (vertexToHiveClass == null) {
+ options.addOption(null, "vertexToHiveClass", true,
+ "Giraph " + VertexToHive.class.getSimpleName() + " class to use");
+ }
+
+ options.addOption("o", "outputTable", true, "Output table name");
+ options.addOption("O", "outputPartition", true,
+ "Output table partition values (e.g., \"a=1,b=two\")");
+ options.addOption("s", "skipOutput", false, "Skip output?");
+ }
+
+ /**
+ * add string to collection
+ * @param conf Configuration
+ * @param name name to add
+ * @param values values for collection
+ */
+ private static void addToStringCollection(Configuration conf, String name,
+ String... values) {
+ addToStringCollection(conf, name, Arrays.asList(values));
+ }
+
+ /**
+ * add string to collection
+ * @param conf Configuration
+ * @param name to add
+ * @param values values for collection
+ */
+ private static void addToStringCollection(
+ Configuration conf, String name, Collection
+ <? extends String> values) {
+ Collection<String> tmpfiles = conf.getStringCollection(name);
+ tmpfiles.addAll(values);
+ conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
+ }
+
+ /**
+ *
+ * @param className to find
+ * @param base base class
+ * @param <T> class type found
+ * @return type found
+ */
+ private <T> Class<? extends T> findClass(String className, Class<T> base) {
+ try {
+ Class<?> cls = Class.forName(className);
+ if (base.isAssignableFrom(cls)) {
+ return cls.asSubclass(base);
+ }
+ return null;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(className + ": Invalid class name");
+ }
+ }
+
+ @Override
+ public final Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public final void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Override this method to add more command-line options. You can process
+ * them by also overriding {@link #processMoreArguments(CommandLine)}.
+ *
+ * @param options Options
+ */
+ protected void addMoreOptions(Options options) {
+ }
+
+ /**
+ * Override this method to process additional command-line arguments. You
+ * may want to declare additional options by also overriding
+ * {@link #addMoreOptions(org.apache.commons.cli.Options)}.
+ *
+ * @param cmd Command
+ */
+ protected void processMoreArguments(CommandLine cmd) {
+ }
+
+ /**
+ * Override this method to do additional setup with the GiraphJob that will
+ * run.
+ *
+ * @param job GiraphJob that is going to run
+ */
+ protected void initGiraphJob(GiraphJob job) { }
+
+ /**
+ * Log the options set by user
+ *
+ * @param giraphConf GiraphConfiguration
+ */
+ private void logOptions(GiraphConfiguration giraphConf) {
+ GiraphClasses classes = new GiraphClasses(giraphConf);
+
+ LOG.info(getClass().getSimpleName() + " with");
+
+ LOG.info(LOG_PREFIX + "-vertexClass=" + vertexClass.getCanonicalName());
+
+ if (hiveToVertexClass != null) {
+ LOG.info(LOG_PREFIX + "-hiveToVertexClass=" +
+ hiveToVertexClass.getCanonicalName());
+ }
+ LOG.info(LOG_PREFIX + "-vertexInputFormatClass=" +
+ classes.getVertexInputFormatClass().getCanonicalName());
+ logInputDesc(hiveVertexInputDescription, "vertex");
+
+ if (hiveToEdgeClass != null) {
+ LOG.info(LOG_PREFIX + "-hiveToEdgeClass=" +
+ hiveToEdgeClass.getCanonicalName());
+ }
+ LOG.info(LOG_PREFIX + "-edgeInputFormatClass=" +
+ classes.getEdgeInputFormatClass().getCanonicalName());
+ logInputDesc(hiveEdgeInputDescription, "edge");
+
+ LOG.info(LOG_PREFIX + "-outputTable=" +
+ hiveOutputDescription.getTableName());
+ if (hiveOutputDescription.hasPartitionValues()) {
+ LOG.info(LOG_PREFIX + "-outputPartition=\"" +
+ hiveOutputDescription.getPartitionValues() + "\"");
+ }
+ LOG.info(LOG_PREFIX + "-outputFormatClass=" +
+ classes.getVertexOutputFormatClass().getCanonicalName());
+
+ LOG.info(LOG_PREFIX + "-workers=" + workers);
+ }
+
+ /**
+ * Helper to log input description with a name
+ *
+ * @param inputDesc input description to log
+ * @param name String prefix name
+ */
+ private void logInputDesc(HiveInputDescription inputDesc, String name) {
+ if (inputDesc.hasTableName()) {
+ LOG.info(
+ LOG_PREFIX + "-" + name + "InputTable=" + inputDesc.getTableName());
+ }
+ if (inputDesc.hasPartitionFilter()) {
+ LOG.info(LOG_PREFIX + "-" + name + "InputFilter=\"" +
+ inputDesc.getPartitionFilter() + "\"");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
new file mode 100644
index 0000000..b0ddc48
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.common;
+
+/**
+ * Profiles used throughout Hive code.
+ */
+public class HiveProfiles {
+ /** name of vertex input profile */
+ public static final String VERTEX_INPUT_PROFILE_ID = "vertex_input_profile";
+ /** name of edge input profile */
+ public static final String EDGE_INPUT_PROFILE_ID = "edge_input_profile";
+ /** name of vertex value input profile */
+ public static final String VERTEX_VALUE_INPUT_PROFILE_ID =
+ "vertex_value_input_profile";
+ /** Name of vertex output profile */
+ public static final String VERTEX_OUTPUT_PROFILE_ID = "vertex_output_profile";
+
+ /** Disable creation */
+ private HiveProfiles() { }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java
new file mode 100644
index 0000000..1535b18
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Common Hive related utilities.
+ */
+package org.apache.giraph.hive.common;
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
new file mode 100644
index 0000000..17405c8
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.edge;
+
+import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.input.HiveApiInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link EdgeInputFormat} for reading edges from Hive.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class HiveEdgeInputFormat<I extends WritableComparable,
+ E extends Writable> extends EdgeInputFormat<I, E> {
+ /** Underlying Hive InputFormat used */
+ private final HiveApiInputFormat hiveInputFormat;
+
+ /**
+ * Create edge input format.
+ */
+ public HiveEdgeInputFormat() {
+ hiveInputFormat = new HiveApiInputFormat();
+ hiveInputFormat.setMyProfileId(HiveProfiles.EDGE_INPUT_PROFILE_ID);
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
+ return hiveInputFormat.getSplits(context);
+ }
+
+ @Override
+ public EdgeReader<I, E> createEdgeReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException {
+ Configuration conf = context.getConfiguration();
+
+ RecordReader<WritableComparable, HiveRecord> baseReader;
+ HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>();
+ reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
+
+ try {
+ baseReader = hiveInputFormat.createRecordReader(split, context);
+ reader.setHiveRecordReader(baseReader);
+ reader.initialize(split, context);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Could not create edge record reader", e);
+ }
+ return reader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
new file mode 100644
index 0000000..34dc4df
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.edge;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeFactory;
+import org.apache.giraph.graph.MutableEdge;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.HiveTableSchema;
+
+import java.io.IOException;
+
+/**
+ * A reader for reading edges from Hive.
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
+ implements EdgeReader<I, E> {
+ /** Configuration key for edge creator class */
+ public static final String EDGE_CREATOR_KEY = "giraph.hive.to.edge.class";
+ /** Configuration key for whether to reuse edge */
+ public static final String REUSE_EDGE_KEY = "giraph.hive.reuse.edge";
+
+ /** Underlying Hive RecordReader used */
+ private RecordReader<WritableComparable, HiveRecord> hiveRecordReader;
+ /** Schema for table in Hive */
+ private HiveTableSchema tableSchema;
+
+ /** Configuration */
+ private ImmutableClassesGiraphConfiguration<I, ?, E, ?> conf;
+
+ /** User class to create edges from a HiveRecord */
+ private HiveToEdge<I, E> hiveToEdge;
+ /**
+ * If we are reusing edges this will be the single edge to read into.
+ * Otherwise if it's null we will create a new edge each time.
+ */
+ private MutableEdge<I, E> edgeToReuse = null;
+
+ /**
+ * Get underlying Hive record reader used.
+ *
+ * @return RecordReader from Hive
+ */
+ public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() {
+ return hiveRecordReader;
+ }
+
+ /**
+ * Set underlying Hive record reader used.
+ *
+ * @param hiveRecordReader RecordReader to read from Hive.
+ */
+ public void setHiveRecordReader(
+ RecordReader<WritableComparable, HiveRecord> hiveRecordReader) {
+ this.hiveRecordReader = hiveRecordReader;
+ }
+
+ /**
+ * Get Hive table schema for table being read from.
+ *
+ * @return Hive table schema for table
+ */
+ public HiveTableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ /**
+ * Set Hive schema for table being read from.
+ *
+ * @param tableSchema Hive table schema
+ */
+ public void setTableSchema(HiveTableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ /**
+ * Get our Configuration.
+ *
+ * @return ImmutableClassesGiraphConfiguration
+ */
+ public ImmutableClassesGiraphConfiguration<I, ?, E, ?> getConf() {
+ return conf;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ hiveRecordReader.initialize(inputSplit, context);
+ conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
+ instantiateHiveToEdgeFromConf();
+ if (conf.getBoolean(REUSE_EDGE_KEY, false)) {
+ edgeToReuse = conf.createMutableEdge();
+ }
+ }
+
+ /**
+ * Retrieve the user's HiveEdgeCreator from the Configuration.
+ *
+ * @throws IOException if anything goes wrong reading from Configuration
+ */
+ private void instantiateHiveToEdgeFromConf() throws IOException {
+ Class<? extends HiveToEdge> klass = conf.getClass(EDGE_CREATOR_KEY,
+ null, HiveToEdge.class);
+ if (klass == null) {
+ throw new IOException(EDGE_CREATOR_KEY + " not set in conf");
+ }
+ hiveToEdge = ReflectionUtils.newInstance(klass, conf);
+ hiveToEdge.setTableSchema(tableSchema);
+ }
+
+ @Override
+ public boolean nextEdge() throws IOException, InterruptedException {
+ return hiveRecordReader.nextKeyValue();
+ }
+
+ @Override
+ public void close() throws IOException {
+ hiveRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return hiveRecordReader.getProgress();
+ }
+
+ @Override
+ public I getCurrentSourceId() throws IOException, InterruptedException {
+ return hiveToEdge.getSourceVertexId(hiveRecordReader.getCurrentValue());
+ }
+
+ @Override
+ public Edge<I, E> getCurrentEdge() throws IOException,
+ InterruptedException {
+ HiveRecord record = hiveRecordReader.getCurrentValue();
+ I targetId = hiveToEdge.getTargetVertexId(record);
+ E edgeValue = hiveToEdge.getEdgeValue(record);
+ if (edgeToReuse == null) {
+ return EdgeFactory.create(targetId, edgeValue);
+ } else {
+ edgeToReuse.setTargetVertexId(targetId);
+ edgeToReuse.setValue(edgeValue);
+ return edgeToReuse;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
new file mode 100644
index 0000000..e800321
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.HiveTableSchemaAware;
+
+/**
+ * An interface used to create edges from Hive records
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public interface HiveToEdge<I extends WritableComparable, E extends Writable>
+ extends HiveTableSchemaAware {
+ /**
+ * Read source vertex ID from Hive record
+ *
+ * @param hiveRecord HiveRecord to read from
+ * @return source vertex ID
+ */
+ I getSourceVertexId(HiveReadableRecord hiveRecord);
+
+ /**
+ * Read target vertex ID from Hive record
+ *
+ * @param hiveRecord HiveRecord to read from
+ * @return target vertex ID
+ */
+ I getTargetVertexId(HiveReadableRecord hiveRecord);
+
+ /**
+ * Read edge value from the Hive record.
+ *
+ * @param hiveRecord HiveRecord to read from
+ * @return Edge value
+ */
+ E getEdgeValue(HiveReadableRecord hiveRecord);
+}
|