giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 903302d
Date Tue, 23 Apr 2013 19:40:48 GMT
Updated Branches:
  refs/heads/trunk 14171ad3c -> 903302d12


GIRAPH-648: Allow IO formats to add parameters to Configuration (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/903302d1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/903302d1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/903302d1

Branch: refs/heads/trunk
Commit: 903302d12fb2af7788a8745af1bd05003ee985dd
Parents: 14171ad
Author: Maja Kabiljo <majakabiljo@maja-mbp.thefacebook.com>
Authored: Tue Apr 23 12:39:38 2013 -0700
Committer: Maja Kabiljo <majakabiljo@maja-mbp.thefacebook.com>
Committed: Tue Apr 23 12:39:38 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../org/apache/giraph/bsp/BspOutputFormat.java     |    4 +-
 .../apache/giraph/conf/GiraphConfiguration.java    |   68 +++++
 .../conf/ImmutableClassesGiraphConfiguration.java  |   68 ++++-
 .../java/org/apache/giraph/io/EdgeInputFormat.java |   14 +-
 .../org/apache/giraph/io/VertexInputFormat.java    |   15 +-
 .../org/apache/giraph/io/VertexOutputFormat.java   |    8 +
 .../giraph/io/internal/WrappedEdgeInputFormat.java |  116 +++++++
 .../io/internal/WrappedVertexInputFormat.java      |  113 +++++++
 .../io/internal/WrappedVertexOutputFormat.java     |  166 +++++++++++
 .../apache/giraph/io/internal/package-info.java    |   21 ++
 .../MultiThreadedSuperstepOutput.java              |    2 +-
 .../SynchronizedSuperstepOutput.java               |    2 +-
 .../org/apache/giraph/master/BspServiceMaster.java |    4 +-
 .../org/apache/giraph/worker/BspServiceWorker.java |    2 +-
 .../giraph/worker/EdgeInputSplitsCallable.java     |    2 +-
 .../giraph/worker/VertexInputSplitsCallable.java   |    2 +-
 .../test/java/org/apache/giraph/TestBspBasic.java  |    2 +-
 .../org/apache/giraph/hive/HiveGiraphRunner.java   |  232 +++++----------
 .../giraph/hive/common/GiraphHiveConstants.java    |   58 ++++-
 .../apache/giraph/hive/common/HiveProfiles.java    |   34 ---
 .../org/apache/giraph/hive/common/HiveUtils.java   |  118 ++++++++
 .../hive/input/edge/HiveEdgeInputFormat.java       |   24 ++-
 .../hive/input/vertex/HiveVertexInputFormat.java   |   24 ++-
 .../giraph/hive/output/HiveVertexOutputFormat.java |   24 ++-
 .../giraph/hive/output/HiveVertexWriter.java       |    9 +-
 26 files changed, 889 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 27eab13..db78bb1 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.0.1 - unreleased
+  GIRAPH-648: Allow IO formats to add parameters to Configuration (majakabiljo)
+
   GIRAPH-635: Website Documentation: Missing presentations (ssc)
 
   GIRAPH-647: Update HiveIO to 0.8 (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
index 574895c..7e7c65f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
@@ -47,7 +47,7 @@ public class BspOutputFormat extends OutputFormat<Text, Text> {
           " will not check anything");
       return;
     }
-    conf.createVertexOutputFormat().checkOutputSpecs(context);
+    conf.createWrappedVertexOutputFormat().checkOutputSpecs(context);
   }
 
   @Override
@@ -60,7 +60,7 @@ public class BspOutputFormat extends OutputFormat<Text, Text> {
           "ImmutableOutputCommiter (does nothing).");
       return new ImmutableOutputCommitter();
     }
-    return conf.createVertexOutputFormat().getOutputCommitter(context);
+    return conf.createWrappedVertexOutputFormat().getOutputCommitter(context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 7f9e38e..795cd0c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -41,19 +41,27 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.DNS;
 
 import java.net.UnknownHostException;
+import java.util.Map;
 
 /**
  * Adds user methods specific to Giraph.  This will be put into an
  * ImmutableClassesGiraphConfiguration that provides the configuration plus
  * the immutable classes.
+ *
+ * Keeps track of parameters which were set so it easily set them in another
+ * copy of configuration.
  */
 public class GiraphConfiguration extends Configuration
     implements GiraphConstants {
+  /** Configuration with parameters which were set in Giraph */
+  private final Configuration giraphSetParameters;
+
   /**
    * Constructor that creates the configuration
    */
   public GiraphConfiguration() {
     configureHadoopSecurity();
+    giraphSetParameters = new Configuration(false);
   }
 
   /**
@@ -64,6 +72,7 @@ public class GiraphConfiguration extends Configuration
   public GiraphConfiguration(Configuration conf) {
     super(conf);
     configureHadoopSecurity();
+    giraphSetParameters = new Configuration(false);
   }
 
   /**
@@ -939,4 +948,63 @@ public class GiraphConfiguration extends Configuration
   public boolean isStaticGraph() {
     return STATIC_GRAPH.isTrue(this);
   }
+
+  /**
+   * Put all parameters set in Giraph to another configuration
+   *
+   * @param conf Configuration
+   */
+  public void updateConfiguration(Configuration conf) {
+    for (Map.Entry<String, String> parameter : giraphSetParameters) {
+      conf.set(parameter.getKey(), parameter.getValue());
+    }
+  }
+
+  @Override
+  public void set(String name, String value) {
+    super.set(name, value);
+    giraphSetParameters.set(name, value);
+  }
+
+  @Override
+  public void setIfUnset(String name, String value) {
+    super.setIfUnset(name, value);
+    giraphSetParameters.set(name, get(name, value));
+  }
+
+  @Override
+  public void setInt(String name, int value) {
+    super.setInt(name, value);
+    giraphSetParameters.setInt(name, value);
+  }
+
+  @Override
+  public void setLong(String name, long value) {
+    super.setLong(name, value);
+    giraphSetParameters.setLong(name, value);
+  }
+
+  @Override
+  public void setFloat(String name, float value) {
+    super.setFloat(name, value);
+    giraphSetParameters.setFloat(name, value);
+  }
+
+  @Override
+  public void setBoolean(String name, boolean value) {
+    super.setBoolean(name, value);
+    giraphSetParameters.setBoolean(name, value);
+  }
+
+  @Override
+  public void setBooleanIfUnset(String name, boolean value) {
+    super.setBooleanIfUnset(name, value);
+    giraphSetParameters.setBoolean(name, getBoolean(name, value));
+  }
+
+  @Override
+  public void setClass(String name, Class<?> theClass, Class<?> xface) {
+    super.setClass(name, theClass, xface);
+    giraphSetParameters.setClass(name, theClass, xface);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 8dfe546..f5a926f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -31,6 +31,9 @@ import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
+import org.apache.giraph.io.internal.WrappedVertexInputFormat;
+import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
 import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
 import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
@@ -173,16 +176,32 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Create a user vertex input format class
+   * Create a user vertex input format class.
+   * Note: Giraph should only use WrappedVertexInputFormat,
+   * which makes sure that Configuration parameters are set properly.
    *
    * @return Instantiated user vertex input format class
    */
-  public VertexInputFormat<I, V, E> createVertexInputFormat() {
+  private VertexInputFormat<I, V, E> createVertexInputFormat() {
     Class<? extends VertexInputFormat<I, V, E>> klass =
-        classes.getVertexInputFormatClass();
+        getVertexInputFormatClass();
     return ReflectionUtils.newInstance(klass, this);
   }
 
+  /**
+   * Create a wrapper for user vertex input format,
+   * which makes sure that Configuration parameters are set properly in all
+   * methods related to this format.
+   *
+   * @return Wrapper around user vertex input format
+   */
+  public WrappedVertexInputFormat<I, V, E> createWrappedVertexInputFormat() {
+    WrappedVertexInputFormat<I, V, E> wrappedVertexInputFormat =
+        new WrappedVertexInputFormat<I, V, E>(createVertexInputFormat());
+    configureIfPossible(wrappedVertexInputFormat);
+    return wrappedVertexInputFormat;
+  }
+
   @Override
   public boolean hasVertexOutputFormat() {
     return classes.hasVertexOutputFormat();
@@ -200,18 +219,33 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Create a user vertex output format class
+   * Create a user vertex output format class.
+   * Note: Giraph should only use WrappedVertexOutputFormat,
+   * which makes sure that Configuration parameters are set properly.
    *
    * @return Instantiated user vertex output format class
    */
-  @SuppressWarnings("rawtypes")
-  public VertexOutputFormat<I, V, E> createVertexOutputFormat() {
+  private VertexOutputFormat<I, V, E> createVertexOutputFormat() {
     Class<? extends VertexOutputFormat<I, V, E>> klass =
-        classes.getVertexOutputFormatClass();
+        getVertexOutputFormatClass();
     return ReflectionUtils.newInstance(klass, this);
   }
 
   /**
+   * Create a wrapper for user vertex output format,
+   * which makes sure that Configuration parameters are set properly in all
+   * methods related to this format.
+   *
+   * @return Wrapper around user vertex output format
+   */
+  public WrappedVertexOutputFormat<I, V, E> createWrappedVertexOutputFormat() {
+    WrappedVertexOutputFormat<I, V, E> wrappedVertexOutputFormat =
+        new WrappedVertexOutputFormat<I, V, E>(createVertexOutputFormat());
+    configureIfPossible(wrappedVertexOutputFormat);
+    return wrappedVertexOutputFormat;
+  }
+
+  /**
    * Create the proper superstep output, based on the configuration settings.
    *
    * @param context Mapper context
@@ -246,16 +280,32 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Create a user edge input format class
+   * Create a user edge input format class.
+   * Note: Giraph should only use WrappedEdgeInputFormat,
+   * which makes sure that Configuration parameters are set properly.
    *
    * @return Instantiated user edge input format class
    */
-  public EdgeInputFormat<I, E> createEdgeInputFormat() {
+  private EdgeInputFormat<I, E> createEdgeInputFormat() {
     Class<? extends EdgeInputFormat<I, E>> klass = getEdgeInputFormatClass();
     return ReflectionUtils.newInstance(klass, this);
   }
 
   /**
+   * Create a wrapper for user edge input format,
+   * which makes sure that Configuration parameters are set properly in all
+   * methods related to this format.
+   *
+   * @return Wrapper around user edge input format
+   */
+  public WrappedEdgeInputFormat<I, E> createWrappedEdgeInputFormat() {
+    WrappedEdgeInputFormat<I, E> wrappedEdgeInputFormat =
+        new WrappedEdgeInputFormat<I, E>(createEdgeInputFormat());
+    configureIfPossible(wrappedEdgeInputFormat);
+    return wrappedEdgeInputFormat;
+  }
+
+  /**
    * Get the user's subclassed {@link AggregatorWriter}.
    *
    * @return User's aggregator writer class

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
index 43cc7be..2aac1f0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
@@ -22,16 +22,21 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.List;
 
 /**
  * Input format for reading single edges.  Provides access to
  * ImmutableClassesGiraphConfiguration.
  *
+ * It's guaranteed that whatever parameters are set in the configuration are
+ * also going to be available in all method arguments related to this input
+ * format (context in getSplits and createEdgeReader; methods invoked on
+ * EdgeReader). So if backing input format relies on some parameters from
+ * configuration, you can safely set them for example in
+ * {@link #setConf(org.apache.giraph.conf.ImmutableClassesGiraphConfiguration)}.
+ *
  * @param <I> Vertex id
  * @param <E> Edge data
  */
@@ -40,11 +45,6 @@ public abstract class EdgeInputFormat<I extends WritableComparable,
     extends
     DefaultImmutableClassesGiraphConfigurable<I, Writable, E, Writable>
     implements GiraphInputFormat {
-  @Override
-  public abstract List<InputSplit> getSplits(
-      JobContext context, int minSplitCountHint) throws IOException,
-      InterruptedException;
-
   /**
    * Create an edge reader for a given split. The framework will call
    * {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
index b3f234f..c4d7fe2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
@@ -19,12 +19,11 @@
 package org.apache.giraph.io;
 
 import java.io.IOException;
-import java.util.List;
+
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
@@ -33,6 +32,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  * vertices across the mappers, so keep that in consideration when implementing
  * getSplits().  Provides access to ImmutableClassesGiraphConfiguration.
  *
+ * It's guaranteed that whatever parameters are set in the configuration are
+ * also going to be available in all method arguments related to this input
+ * format (context in getSplits and createVertexReader; methods invoked on
+ * VertexReader). So if backing input format relies on some parameters from
+ * configuration, you can safely set them for example in
+ * {@link #setConf(org.apache.giraph.conf.ImmutableClassesGiraphConfiguration)}.
+ *
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
@@ -42,11 +48,6 @@ public abstract class VertexInputFormat<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable>
     implements GiraphInputFormat {
-  @Override
-  public abstract List<InputSplit> getSplits(
-    JobContext context, int minSplitCountHint)
-    throws IOException, InterruptedException;
-
   /**
    * Create a vertex reader for a given split. Guaranteed to have been
    * configured with setConf() prior to use.  The framework will also call

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
index 71eb665..154f7e4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
@@ -33,6 +33,14 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  * directly after the Hadoop OutputFormat.
  * ImmutableClassesGiraphConfiguration is available
  *
+ * It's guaranteed that whatever parameters are set in the configuration are
+ * also going to be available in all method arguments related to this output
+ * format (context in createVertexWriter, checkOutputSpecs and
+ * getOutputCommitter; methods invoked on VertexWriter and OutputCommitter).
+ * So if backing output format relies on some parameters from configuration,
+ * you can safely set them for example in
+ * {@link #setConf(org.apache.giraph.conf.ImmutableClassesGiraphConfiguration)}.
+ *
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
new file mode 100644
index 0000000..9b14727
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * For internal use only.
+ *
+ * Wraps user set {@link EdgeInputFormat} to make sure proper configuration
+ * parameters are passed around, that user can set parameters in
+ * configuration and they will be available in other methods related to this
+ * format.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class WrappedEdgeInputFormat<I extends WritableComparable,
+    E extends Writable> extends EdgeInputFormat<I, E> {
+  /** {@link EdgeInputFormat} which is wrapped */
+  private EdgeInputFormat<I, E> originalInputFormat;
+
+  /**
+   * Constructor
+   *
+   * @param edgeInputFormat Edge input format to wrap
+   */
+  public WrappedEdgeInputFormat(
+      EdgeInputFormat<I, E> edgeInputFormat) {
+    originalInputFormat = edgeInputFormat;
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context,
+      int minSplitCountHint) throws IOException, InterruptedException {
+    getConf().updateConfiguration(context.getConfiguration());
+    return originalInputFormat.getSplits(context, minSplitCountHint);
+  }
+
+  @Override
+  public EdgeReader<I, E> createEdgeReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    getConf().updateConfiguration(context.getConfiguration());
+    final EdgeReader<I, E> edgeReader =
+        originalInputFormat.createEdgeReader(split, context);
+    return new EdgeReader<I, E>() {
+      @Override
+      public void setConf(
+          ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+        super.setConf(conf);
+        edgeReader.setConf(conf);
+      }
+
+      @Override
+      public void initialize(InputSplit inputSplit,
+          TaskAttemptContext context) throws IOException, InterruptedException {
+        WrappedEdgeInputFormat.this.getConf().updateConfiguration(
+            context.getConfiguration());
+        edgeReader.initialize(inputSplit, context);
+      }
+
+      @Override
+      public boolean nextEdge() throws IOException, InterruptedException {
+        return edgeReader.nextEdge();
+      }
+
+      @Override
+      public I getCurrentSourceId() throws IOException, InterruptedException {
+        return edgeReader.getCurrentSourceId();
+      }
+
+      @Override
+      public Edge<I, E> getCurrentEdge() throws IOException,
+          InterruptedException {
+        return edgeReader.getCurrentEdge();
+      }
+
+      @Override
+      public void close() throws IOException {
+        edgeReader.close();
+      }
+
+      @Override
+      public float getProgress() throws IOException, InterruptedException {
+        return edgeReader.getProgress();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
new file mode 100644
index 0000000..4f0cfea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * For internal use only.
+ *
+ * Wraps user set {@link VertexInputFormat} to make sure proper configuration
+ * parameters are passed around, that user can set parameters in
+ * configuration and they will be available in other methods related to this
+ * format.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class WrappedVertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends VertexInputFormat<I, V, E> {
+  /** {@link VertexInputFormat} which is wrapped */
+  private VertexInputFormat<I, V, E> originalInputFormat;
+
+  /**
+   * Constructor
+   *
+   * @param vertexInputFormat Vertex input format to wrap
+   */
+  public WrappedVertexInputFormat(
+      VertexInputFormat<I, V, E> vertexInputFormat) {
+    originalInputFormat = vertexInputFormat;
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context,
+      int minSplitCountHint) throws IOException, InterruptedException {
+    getConf().updateConfiguration(context.getConfiguration());
+    return originalInputFormat.getSplits(context, minSplitCountHint);
+  }
+
+  @Override
+  public VertexReader<I, V, E> createVertexReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    getConf().updateConfiguration(context.getConfiguration());
+    final VertexReader<I, V, E> vertexReader =
+        originalInputFormat.createVertexReader(split, context);
+    return new VertexReader<I, V, E>() {
+      @Override
+      public void setConf(
+          ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+        super.setConf(conf);
+        vertexReader.setConf(conf);
+      }
+
+      @Override
+      public void initialize(InputSplit inputSplit,
+          TaskAttemptContext context) throws IOException, InterruptedException {
+        WrappedVertexInputFormat.this.getConf().updateConfiguration(
+            context.getConfiguration());
+        vertexReader.initialize(inputSplit, context);
+      }
+
+      @Override
+      public boolean nextVertex() throws IOException, InterruptedException {
+        return vertexReader.nextVertex();
+      }
+
+      @Override
+      public Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+          InterruptedException {
+        return vertexReader.getCurrentVertex();
+      }
+
+      @Override
+      public void close() throws IOException {
+        vertexReader.close();
+      }
+
+      @Override
+      public float getProgress() throws IOException, InterruptedException {
+        return vertexReader.getProgress();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
new file mode 100644
index 0000000..c1046ae
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * For internal use only.
+ *
+ * Wraps user set {@link VertexOutputFormat} to make sure proper configuration
+ * parameters are passed around, that user can set parameters in
+ * configuration and they will be available in other methods related to this
+ * format.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class WrappedVertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends VertexOutputFormat<I, V, E> {
+  /** {@link VertexOutputFormat} which is wrapped */
+  private VertexOutputFormat<I, V, E> originalOutputFormat;
+
+  /**
+   * Constructor
+   *
+   * @param vertexOutputFormat Vertex output format to wrap
+   */
+  public WrappedVertexOutputFormat(
+      VertexOutputFormat<I, V, E> vertexOutputFormat) {
+    originalOutputFormat = vertexOutputFormat;
+  }
+
+  @Override
+  public VertexWriter<I, V, E> createVertexWriter(
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    getConf().updateConfiguration(context.getConfiguration());
+    final VertexWriter<I, V, E> vertexWriter =
+        originalOutputFormat.createVertexWriter(context);
+    return new VertexWriter<I, V, E>() {
+      @Override
+      public void setConf(
+          ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+        super.setConf(conf);
+        vertexWriter.setConf(conf);
+      }
+
+      @Override
+      public void initialize(
+          TaskAttemptContext context) throws IOException, InterruptedException {
+        getConf().updateConfiguration(context.getConfiguration());
+        vertexWriter.initialize(context);
+      }
+
+      @Override
+      public void close(
+          TaskAttemptContext context) throws IOException, InterruptedException {
+        getConf().updateConfiguration(context.getConfiguration());
+        vertexWriter.close(context);
+      }
+
+      @Override
+      public void writeVertex(
+          Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+        vertexWriter.writeVertex(vertex);
+      }
+    };
+  }
+
+  @Override
+  public void checkOutputSpecs(
+      JobContext context) throws IOException, InterruptedException {
+    getConf().updateConfiguration(context.getConfiguration());
+    originalOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    getConf().updateConfiguration(context.getConfiguration());
+    final OutputCommitter outputCommitter =
+        originalOutputFormat.getOutputCommitter(context);
+    return new OutputCommitter() {
+      @Override
+      public void setupJob(JobContext context) throws IOException {
+        getConf().updateConfiguration(context.getConfiguration());
+        outputCommitter.setupJob(context);
+      }
+
+      @Override
+      public void setupTask(TaskAttemptContext context) throws IOException {
+        getConf().updateConfiguration(context.getConfiguration());
+        outputCommitter.setupTask(context);
+      }
+
+      @Override
+      public boolean needsTaskCommit(
+          TaskAttemptContext context) throws IOException {
+        getConf().updateConfiguration(context.getConfiguration());
+        return outputCommitter.needsTaskCommit(context);
+      }
+
+      @Override
+      public void commitTask(TaskAttemptContext context) throws IOException {
+        getConf().updateConfiguration(context.getConfiguration());
+        outputCommitter.commitTask(context);
+      }
+
+      @Override
+      public void abortTask(TaskAttemptContext context) throws IOException {
+        getConf().updateConfiguration(context.getConfiguration());
+        outputCommitter.abortTask(context);
+      }
+
+      @Override
+      public void commitJob(JobContext context) throws IOException {
+        getConf().updateConfiguration(context.getConfiguration());
+        outputCommitter.commitJob(context);
+
+      }
+
+      @Override
+      public void cleanupJob(JobContext context) throws IOException {
+        getConf().updateConfiguration(context.getConfiguration());
+        outputCommitter.cleanupJob(context);
+
+      }
+
+      @Override
+      public void abortJob(JobContext context,
+          JobStatus.State state) throws IOException {
+        getConf().updateConfiguration(context.getConfiguration());
+        outputCommitter.abortJob(context, state);
+
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/io/internal/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/package-info.java
new file mode 100644
index 0000000..c98168d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Input/Output classes for internal use only.
+ */
+package org.apache.giraph.io.internal;

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
index af086e1..a8dff87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
@@ -68,7 +68,7 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
       ImmutableClassesGiraphConfiguration<I, V, E, ?> conf,
       Mapper<?, ?, ?, ?>.Context context) {
     this.configuration = conf;
-    vertexOutputFormat = conf.createVertexOutputFormat();
+    vertexOutputFormat = conf.createWrappedVertexOutputFormat();
     this.context = context;
     availableVertexWriters = Lists.newArrayList();
     occupiedVertexWriters = Sets.newHashSet();

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
index 2a7af29..f94bd56 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
@@ -62,7 +62,7 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
     this.context = context;
     try {
       vertexWriter =
-          conf.createVertexOutputFormat().createVertexWriter(context);
+          conf.createWrappedVertexOutputFormat().createVertexWriter(context);
       vertexWriter.setConf(
           (ImmutableClassesGiraphConfiguration<I, V, E, Writable>) conf);
       vertexWriter.initialize(context);

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index d01dbb4..2c1a679 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -688,7 +688,7 @@ public class BspServiceMaster<I extends WritableComparable,
       return 0;
     }
     VertexInputFormat<I, V, E> vertexInputFormat =
-        getConfiguration().createVertexInputFormat();
+        getConfiguration().createWrappedVertexInputFormat();
     return createInputSplits(vertexInputFormat, vertexInputSplitsPaths,
         "Vertex");
   }
@@ -700,7 +700,7 @@ public class BspServiceMaster<I extends WritableComparable,
       return 0;
     }
     EdgeInputFormat<I, E> edgeInputFormat =
-        getConfiguration().createEdgeInputFormat();
+        getConfiguration().createWrappedEdgeInputFormat();
     return createInputSplits(edgeInputFormat, edgeInputSplitsPaths,
         "Edge");
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 037cdfc..01937ab 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -933,7 +933,7 @@ else[HADOOP_NON_SECURE]*/
         "saveVertices: Starting to save " + numLocalVertices + " vertices " +
             "using " + numThreads + " threads");
     final VertexOutputFormat<I, V, E> vertexOutputFormat =
-        getConfiguration().createVertexOutputFormat();
+        getConfiguration().createWrappedVertexOutputFormat();
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
       @Override
       public Callable<Void> newCallable(int callableId) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index afb636b..de1e774 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -106,7 +106,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
       GraphState<I, V, E, M> graphState) throws IOException,
       InterruptedException {
     EdgeInputFormat<I, E> edgeInputFormat =
-        configuration.createEdgeInputFormat();
+        configuration.createWrappedEdgeInputFormat();
     EdgeReader<I, E> edgeReader =
         edgeInputFormat.createEdgeReader(inputSplit, context);
     edgeReader.setConf(

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index c426032..224856e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -112,7 +112,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
       GraphState<I, V, E, M> graphState)
     throws IOException, InterruptedException {
     VertexInputFormat<I, V, E> vertexInputFormat =
-        configuration.createVertexInputFormat();
+        configuration.createWrappedVertexInputFormat();
     VertexReader<I, V, E> vertexReader =
         vertexInputFormat.createVertexReader(inputSplit, context);
     vertexReader.setConf(

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index e034b2f..aa3b08c 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -129,7 +129,7 @@ public class
     vertex.initialize(new LongWritable(1), new IntWritable(1));
     System.out.println("testInstantiateVertex: Got vertex " + vertex);
     VertexInputFormat<LongWritable, IntWritable, FloatWritable>
-    inputFormat = configuration.createVertexInputFormat();
+    inputFormat = configuration.createWrappedVertexInputFormat();
 /*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
       List<InputSplit> splitArray =
           inputFormat.getSplits(

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index e6db762..8d67c1d 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -32,7 +32,6 @@ import org.apache.giraph.hive.input.edge.HiveToEdge;
 import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
 import org.apache.giraph.hive.output.HiveVertexOutputFormat;
-import org.apache.giraph.hive.output.HiveVertexWriter;
 import org.apache.giraph.hive.output.VertexToHive;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.hadoop.conf.Configuration;
@@ -42,28 +41,28 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
-import com.facebook.hiveio.input.HiveApiInputFormat;
-import com.facebook.hiveio.input.HiveInputDescription;
-import com.facebook.hiveio.output.HiveApiOutputFormat;
-import com.facebook.hiveio.output.HiveOutputDescription;
-import com.facebook.hiveio.schema.HiveTableSchemas;
-import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_SPLITS;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_DATABASE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_PARTITION;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_PROFILE_ID;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_TABLE;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_EDGE_CLASS;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_VERTEX_CLASS;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_SPLITS;
-import static org.apache.giraph.hive.common.HiveProfiles.EDGE_INPUT_PROFILE_ID;
-import static org.apache.giraph.hive.common.HiveProfiles.VERTEX_INPUT_PROFILE_ID;
-import static org.apache.giraph.hive.common.HiveProfiles.VERTEX_OUTPUT_PROFILE_ID;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_DATABASE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_PARTITION;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_PROFILE_ID;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_TABLE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_TABLE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
 
 /**
  * Hive Giraph Runner
@@ -84,18 +83,12 @@ public class HiveGiraphRunner implements Tool {
 
   /** Vertex creator from hive records. */
   private Class<? extends HiveToVertex> hiveToVertexClass;
-  /** hive vertex input information */
-  private  final HiveInputDescription hiveVertexInputDescription;
 
   /** Edge creator from hive records. */
   private Class<? extends HiveToEdge> hiveToEdgeClass;
-  /** hive edge input information */
-  private final HiveInputDescription hiveEdgeInputDescription;
 
   /** Hive Vertex writer */
   private Class<? extends VertexToHive> vertexToHiveClass;
-  /** hive output information */
-  private final HiveOutputDescription hiveOutputDescription;
   /** Skip output? (Useful for testing without writing) */
   private boolean skipOutput = false;
 
@@ -105,9 +98,6 @@ public class HiveGiraphRunner implements Tool {
   /** Create a new runner */
   public HiveGiraphRunner() {
     conf = new HiveConf(getClass());
-    hiveVertexInputDescription = new HiveInputDescription();
-    hiveEdgeInputDescription = new HiveInputDescription();
-    hiveOutputDescription = new HiveOutputDescription();
   }
 
   public Class<? extends Vertex> getVertexClass() {
@@ -118,18 +108,6 @@ public class HiveGiraphRunner implements Tool {
     this.vertexClass = vertexClass;
   }
 
-  public HiveInputDescription getHiveVertexInputDescription() {
-    return hiveVertexInputDescription;
-  }
-
-  public HiveOutputDescription getHiveOutputDescription() {
-    return hiveOutputDescription;
-  }
-
-  public HiveInputDescription getHiveEdgeInputDescription() {
-    return hiveEdgeInputDescription;
-  }
-
   public Class<? extends HiveToVertex> getHiveToVertexClass() {
     return hiveToVertexClass;
   }
@@ -198,8 +176,7 @@ public class HiveGiraphRunner implements Tool {
   public void setVertexToHiveClass(
       Class<? extends VertexToHive> vertexToHiveClass) {
     this.vertexToHiveClass = vertexToHiveClass;
-    conf.setClass(HiveVertexWriter.VERTEX_TO_HIVE_KEY, vertexToHiveClass,
-        VertexToHive.class);
+    VERTEX_TO_HIVE_CLASS.set(conf, vertexToHiveClass);
   }
 
   /**
@@ -251,21 +228,13 @@ public class HiveGiraphRunner implements Tool {
    */
   private void setupHiveInputs(GiraphConfiguration conf) throws TException {
     if (hiveToVertexClass != null) {
-      hiveVertexInputDescription.setNumSplits(HIVE_VERTEX_SPLITS.get(conf));
-      HiveApiInputFormat.setProfileInputDesc(conf, hiveVertexInputDescription,
-          VERTEX_INPUT_PROFILE_ID);
       conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
-      HiveTableSchemas.put(conf, VERTEX_INPUT_PROFILE_ID,
-          hiveVertexInputDescription.hiveTableName());
+      HIVE_VERTEX_INPUT_PROFILE_ID.set(conf, "vertex_input_profile");
     }
 
     if (hiveToEdgeClass != null) {
-      hiveEdgeInputDescription.setNumSplits(HIVE_EDGE_SPLITS.get(conf));
-      HiveApiInputFormat.setProfileInputDesc(conf, hiveEdgeInputDescription,
-          EDGE_INPUT_PROFILE_ID);
       conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
-      HiveTableSchemas.put(conf, EDGE_INPUT_PROFILE_ID,
-          hiveEdgeInputDescription.hiveTableName());
+      HIVE_EDGE_INPUT_PROFILE_ID.set(conf, "edge_input_profile");
     }
   }
 
@@ -279,11 +248,8 @@ public class HiveGiraphRunner implements Tool {
     if (skipOutput) {
       LOG.warn("run: Warning - Output will be skipped!");
     } else if (vertexToHiveClass != null) {
-      HiveApiOutputFormat.initProfile(conf, hiveOutputDescription,
-          VERTEX_OUTPUT_PROFILE_ID);
       conf.setVertexOutputFormatClass(HiveVertexOutputFormat.class);
-      HiveTableSchemas.put(conf, VERTEX_OUTPUT_PROFILE_ID,
-          hiveOutputDescription.hiveTableName());
+      HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile");
     } else {
       LOG.fatal("output requested but " + VertexToHive.class.getSimpleName() +
           " not set");
@@ -291,8 +257,8 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-  * set hive configuration
-  */
+   * set hive configuration
+   */
   private void adjustConfigurationForHive() {
     // when output partitions are used, workers register them to the
     // metastore at cleanup stage, and on HiveConf's initialization, it
@@ -316,14 +282,14 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-  * process arguments
-  * @param args to process
-  * @return CommandLine instance
-  * @throws org.apache.commons.cli.ParseException error parsing arguments
-  * @throws InterruptedException interrupted
-  */
+   * process arguments
+   * @param args to process
+   * @return CommandLine instance
+   * @throws org.apache.commons.cli.ParseException error parsing arguments
+   * @throws InterruptedException interrupted
+   */
   private CommandLine handleCommandLine(String[] args) throws ParseException,
-            InterruptedException {
+      InterruptedException {
     Options options = new Options();
     addOptions(options);
     addMoreOptions(options);
@@ -413,22 +379,21 @@ public class HiveGiraphRunner implements Tool {
     }
 
     String dbName = cmdln.getOptionValue("dbName", "default");
-    hiveVertexInputDescription.setDbName(dbName);
-    hiveEdgeInputDescription.setDbName(dbName);
-    hiveOutputDescription.setDbName(dbName);
-
-    hiveEdgeInputDescription.setPartitionFilter(
-        cmdln.getOptionValue("edgeInputFilter"));
-    hiveEdgeInputDescription.setTableName(edgeInputTableStr);
 
-    hiveVertexInputDescription.setPartitionFilter(
+    HIVE_VERTEX_INPUT_DATABASE.set(conf, dbName);
+    HIVE_VERTEX_INPUT_TABLE.set(conf, vertexInputTableStr);
+    HIVE_VERTEX_INPUT_PARTITION.set(conf,
         cmdln.getOptionValue("vertexInputFilter"));
-    hiveVertexInputDescription.setTableName(vertexInputTableStr);
 
-    hiveOutputDescription.setTableName(cmdln.getOptionValue("outputTable"));
-    hiveOutputDescription.setPartitionValues(
-        parsePartitionValues(cmdln.getOptionValue("outputPartition"))
-    );
+    HIVE_EDGE_INPUT_DATABASE.set(conf, dbName);
+    HIVE_EDGE_INPUT_TABLE.set(conf, edgeInputTableStr);
+    HIVE_EDGE_INPUT_PARTITION.set(conf,
+        cmdln.getOptionValue("edgeInputFilter"));
+
+    HIVE_VERTEX_OUTPUT_DATABASE.set(conf, dbName);
+    HIVE_VERTEX_OUTPUT_TABLE.set(conf, cmdln.getOptionValue("outputTable"));
+    HIVE_VERTEX_OUTPUT_PARTITION.set(conf,
+        cmdln.getOptionValue("outputPartition"));
 
     workers = Integer.parseInt(workersStr);
 
@@ -463,30 +428,6 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-   * @param outputTablePartitionString table partition string
-   * @return Map
-   */
-  public static Map<String, String> parsePartitionValues(
-      String outputTablePartitionString) {
-    if (outputTablePartitionString == null) {
-      return null;
-    }
-    Splitter commaSplitter = Splitter.on(',').omitEmptyStrings().trimResults();
-    Splitter equalSplitter = Splitter.on('=').omitEmptyStrings().trimResults();
-    Map<String, String> partitionValues = Maps.newHashMap();
-    for (String keyValStr : commaSplitter.split(outputTablePartitionString)) {
-      List<String> keyVal = Lists.newArrayList(equalSplitter.split(keyValStr));
-      if (keyVal.size() != 2) {
-        throw new IllegalArgumentException(
-            "Unrecognized partition value format: " +
-            outputTablePartitionString);
-      }
-      partitionValues.put(keyVal.get(0), keyVal.get(1));
-    }
-    return partitionValues;
-  }
-
-  /**
    * Add hive-related options to command line parser options
    *
    * @param options Options to use
@@ -542,37 +483,37 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-  * add string to collection
-  * @param conf Configuration
-  * @param name name to add
-  * @param values values for collection
-  */
+   * add string to collection
+   * @param conf Configuration
+   * @param name name to add
+   * @param values values for collection
+   */
   private static void addToStringCollection(Configuration conf, String name,
-                                              String... values) {
+      String... values) {
     addToStringCollection(conf, name, Arrays.asList(values));
   }
 
   /**
-  * add string to collection
-  * @param conf Configuration
-  * @param name to add
-  * @param values values for collection
-  */
+   * add string to collection
+   * @param conf Configuration
+   * @param name to add
+   * @param values values for collection
+   */
   private static void addToStringCollection(
-          Configuration conf, String name, Collection
-          <? extends String> values) {
+      Configuration conf, String name, Collection
+      <? extends String> values) {
     Collection<String> tmpfiles = conf.getStringCollection(name);
     tmpfiles.addAll(values);
     conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
   }
 
   /**
-  *
-  * @param className to find
-  * @param base  base class
-  * @param <T> class type found
-  * @return type found
-  */
+   *
+   * @param className to find
+   * @param base  base class
+   * @param <T> class type found
+   * @return type found
+   */
   private <T> Class<? extends T> findClass(String className, Class<T> base) {
     try {
       Class<?> cls = Class.forName(className);
@@ -596,30 +537,30 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-  * Override this method to add more command-line options. You can process
-  * them by also overriding {@link #processMoreArguments(CommandLine)}.
-  *
-  * @param options Options
-  */
+   * Override this method to add more command-line options. You can process
+   * them by also overriding {@link #processMoreArguments(CommandLine)}.
+   *
+   * @param options Options
+   */
   protected void addMoreOptions(Options options) {
   }
 
   /**
-  * Override this method to process additional command-line arguments. You
-  * may want to declare additional options by also overriding
-  * {@link #addMoreOptions(org.apache.commons.cli.Options)}.
-  *
-  * @param cmd Command
-  */
+   * Override this method to process additional command-line arguments. You
+   * may want to declare additional options by also overriding
+   * {@link #addMoreOptions(org.apache.commons.cli.Options)}.
+   *
+   * @param cmd Command
+   */
   protected void processMoreArguments(CommandLine cmd) {
   }
 
   /**
-  * Override this method to do additional setup with the GiraphJob that will
-  * run.
-  *
-  * @param job GiraphJob that is going to run
-  */
+   * Override this method to do additional setup with the GiraphJob that will
+   * run.
+   *
+   * @param job GiraphJob that is going to run
+   */
   protected void initGiraphJob(GiraphJob job) { }
 
   /**
@@ -641,7 +582,6 @@ public class HiveGiraphRunner implements Tool {
     if (classes.getVertexInputFormatClass() != null) {
       LOG.info(LOG_PREFIX + "-vertexInputFormatClass=" +
           classes.getVertexInputFormatClass().getCanonicalName());
-      logInputDesc(hiveVertexInputDescription, "vertex");
     }
 
     if (hiveToEdgeClass != null) {
@@ -650,16 +590,9 @@ public class HiveGiraphRunner implements Tool {
     }
     if (classes.getEdgeInputFormatClass() != null) {
       LOG.info(LOG_PREFIX + "-edgeInputFormatClass=" +
-        classes.getEdgeInputFormatClass().getCanonicalName());
-      logInputDesc(hiveEdgeInputDescription, "edge");
+          classes.getEdgeInputFormatClass().getCanonicalName());
     }
 
-    LOG.info(LOG_PREFIX + "-outputTable=" +
-        hiveOutputDescription.getTableName());
-    if (hiveOutputDescription.hasPartitionValues()) {
-      LOG.info(LOG_PREFIX + "-outputPartition=\"" +
-          hiveOutputDescription.getPartitionValues() + "\"");
-    }
     if (classes.getVertexOutputFormatClass() != null) {
       LOG.info(LOG_PREFIX + "-outputFormatClass=" +
           classes.getVertexOutputFormatClass().getCanonicalName());
@@ -667,21 +600,4 @@ public class HiveGiraphRunner implements Tool {
 
     LOG.info(LOG_PREFIX + "-workers=" + workers);
   }
-
-  /**
-   * Helper to log input description with a name
-   *
-   * @param inputDesc input description to log
-   * @param name String prefix name
-   */
-  private void logInputDesc(HiveInputDescription inputDesc, String name) {
-    if (inputDesc.hasTableName()) {
-      LOG.info(
-          LOG_PREFIX + "-" + name + "InputTable=" + inputDesc.getTableName());
-    }
-    if (inputDesc.hasPartitionFilter()) {
-      LOG.info(LOG_PREFIX + "-" + name + "InputFilter=\"" +
-          inputDesc.getPartitionFilter() + "\"");
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
index f8363b1..49e1bb5 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
@@ -20,27 +20,71 @@ package org.apache.giraph.hive.common;
 
 import org.apache.giraph.conf.ClassConfOption;
 import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.conf.StrConfOption;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
 import org.apache.giraph.hive.input.vertex.HiveToVertex;
+import org.apache.giraph.hive.output.VertexToHive;
 
 /**
  * Constants for giraph-hive
  */
 public class GiraphHiveConstants {
-  /** Number of edge splits */
-  public static final IntConfOption HIVE_EDGE_SPLITS =
-      new IntConfOption("giraph.hive.input.edge.splits", 0);
+  /** Class for converting hive records to edges */
+  public static final ClassConfOption<HiveToVertex> HIVE_TO_VERTEX_CLASS =
+      ClassConfOption.create("giraph.hive.to.vertex.class", null,
+          HiveToVertex.class);
+  /** Vertex input profile id */
+  public static final StrConfOption HIVE_VERTEX_INPUT_PROFILE_ID =
+      new StrConfOption("giraph.hive.input.vertex.profileId", "");
   /** Number of vertex splits */
   public static final IntConfOption HIVE_VERTEX_SPLITS =
       new IntConfOption("giraph.hive.input.vertex.splits", 0);
+  /** Vertex input database name */
+  public static final StrConfOption HIVE_VERTEX_INPUT_DATABASE =
+      new StrConfOption("giraph.hive.input.vertex.database", "");
+  /** Vertex input table name */
+  public static final StrConfOption HIVE_VERTEX_INPUT_TABLE =
+      new StrConfOption("giraph.hive.input.vertex.table", "");
+  /** Vertex input partition filter */
+  public static final StrConfOption HIVE_VERTEX_INPUT_PARTITION =
+      new StrConfOption("giraph.hive.input.vertex.partition", "");
+
   /** Class for converting hive records to edges */
   public static final ClassConfOption<HiveToEdge> HIVE_TO_EDGE_CLASS =
       ClassConfOption.create("giraph.hive.to.edge.class", null,
           HiveToEdge.class);
-  /** Class for converting hive records to vertices */
-  public static final ClassConfOption<HiveToVertex> HIVE_TO_VERTEX_CLASS =
-      ClassConfOption.create("giraph.hive.to.vertex.class", null,
-          HiveToVertex.class);
+  /** Edge input profile id */
+  public static final StrConfOption HIVE_EDGE_INPUT_PROFILE_ID =
+      new StrConfOption("giraph.hive.input.edge.profileId", "");
+  /** Number of edge splits */
+  public static final IntConfOption HIVE_EDGE_SPLITS =
+      new IntConfOption("giraph.hive.input.edge.splits", 0);
+  /** Edge input database name */
+  public static final StrConfOption HIVE_EDGE_INPUT_DATABASE =
+      new StrConfOption("giraph.hive.input.edge.database", "");
+  /** Edge input table name */
+  public static final StrConfOption HIVE_EDGE_INPUT_TABLE =
+      new StrConfOption("giraph.hive.input.edge.table", "");
+  /** Edge input partition filter */
+  public static final StrConfOption HIVE_EDGE_INPUT_PARTITION =
+      new StrConfOption("giraph.hive.input.edge.partition", "");
+
+  /** Class for converting vertices to Hive records */
+  public static final ClassConfOption<VertexToHive> VERTEX_TO_HIVE_CLASS =
+      ClassConfOption.create("giraph.vertex.to.hive.class", null,
+          VertexToHive.class);
+  /** Vertex output profile id */
+  public static final StrConfOption HIVE_VERTEX_OUTPUT_PROFILE_ID =
+      new StrConfOption("giraph.hive.output.vertex.profileId", "");
+  /** Vertex output database name */
+  public static final StrConfOption HIVE_VERTEX_OUTPUT_DATABASE =
+      new StrConfOption("giraph.hive.output.vertex.database", "");
+  /** Vertex output table name */
+  public static final StrConfOption HIVE_VERTEX_OUTPUT_TABLE =
+      new StrConfOption("giraph.hive.output.vertex.table", "");
+  /** Vertex output partition */
+  public static final StrConfOption HIVE_VERTEX_OUTPUT_PARTITION =
+      new StrConfOption("giraph.hive.output.vertex.partition", "");
 
   /** Don't construct */
   protected GiraphHiveConstants() { }

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
deleted file mode 100644
index 892d443..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.hive.common;
-
-/**
- * Profiles used throughout Hive code.
- */
-public class HiveProfiles {
-  /** name of vertex input profile */
-  public static final String VERTEX_INPUT_PROFILE_ID = "vertex_input_profile";
-  /** name of edge input profile */
-  public static final String EDGE_INPUT_PROFILE_ID = "edge_input_profile";
-  /** Name of vertex output profile */
-  public static final String VERTEX_OUTPUT_PROFILE_ID = "vertex_output_profile";
-
-  /** Disable creation */
-  private HiveProfiles() { }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
new file mode 100644
index 0000000..a8c88f5
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TException;
+
+import com.facebook.hiveio.input.HiveApiInputFormat;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.output.HiveApiOutputFormat;
+import com.facebook.hiveio.output.HiveOutputDescription;
+import com.facebook.hiveio.schema.HiveTableSchemas;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility methods for Hive IO
+ */
+public class HiveUtils {
+  /** Do not instantiate */
+  private HiveUtils() {
+  }
+
+  /**
+   * Initialize hive input, prepare Configuration parameters
+   *
+   * @param hiveInputFormat HiveApiInputFormat
+   * @param profileId Profile id
+   * @param dbName Database name
+   * @param tableName Table name
+   * @param partitionFilter Partition filter
+   * @param numSplits Number of splits
+   * @param conf Configuration
+   */
+  public static void initializeHiveInput(HiveApiInputFormat hiveInputFormat,
+      String profileId, String dbName, String tableName, String partitionFilter,
+      int numSplits, Configuration conf) {
+    hiveInputFormat.setMyProfileId(profileId);
+    HiveInputDescription inputDescription = new HiveInputDescription();
+    inputDescription.setDbName(dbName);
+    inputDescription.setTableName(tableName);
+    inputDescription.setPartitionFilter(partitionFilter);
+    inputDescription.setNumSplits(numSplits);
+    HiveApiInputFormat.setProfileInputDesc(conf, inputDescription, profileId);
+    HiveTableSchemas.put(conf, profileId, inputDescription.hiveTableName());
+  }
+
+  /**
+   * Initialize hive output, prepare Configuration parameters
+   *
+   * @param hiveOutputFormat HiveApiOutputFormat
+   * @param profileId Profile id
+   * @param dbName Database name
+   * @param tableName Table name
+   * @param partition Partition
+   * @param conf Configuration
+   */
+  public static void initializeHiveOutput(HiveApiOutputFormat hiveOutputFormat,
+      String profileId, String dbName, String tableName, String partition,
+      Configuration conf) {
+    hiveOutputFormat.setMyProfileId(profileId);
+    HiveOutputDescription outputDescription = new HiveOutputDescription();
+    outputDescription.setDbName(dbName);
+    outputDescription.setTableName(tableName);
+    outputDescription.setPartitionValues(parsePartitionValues(partition));
+    try {
+      HiveApiOutputFormat.initProfile(conf, outputDescription, profileId);
+    } catch (TException e) {
+      throw new IllegalStateException(
+          "initializeHiveOutput: TException occurred", e);
+    }
+    HiveTableSchemas.put(conf, profileId, outputDescription.hiveTableName());
+  }
+
+  /**
+   * @param outputTablePartitionString table partition string
+   * @return Map
+   */
+  public static Map<String, String> parsePartitionValues(
+      String outputTablePartitionString) {
+    if (outputTablePartitionString == null) {
+      return null;
+    }
+    Splitter commaSplitter = Splitter.on(',').omitEmptyStrings().trimResults();
+    Splitter equalSplitter = Splitter.on('=').omitEmptyStrings().trimResults();
+    Map<String, String> partitionValues = Maps.newHashMap();
+    for (String keyValStr : commaSplitter.split(outputTablePartitionString)) {
+      List<String> keyVal = Lists.newArrayList(equalSplitter.split(keyValStr));
+      if (keyVal.size() != 2) {
+        throw new IllegalArgumentException(
+            "Unrecognized partition value format: " +
+                outputTablePartitionString);
+      }
+      partitionValues.put(keyVal.get(0), keyVal.get(1));
+    }
+    return partitionValues;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index 9cfdb20..bdae1dd 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -18,7 +18,8 @@
 
 package org.apache.giraph.hive.input.edge;
 
-import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.io.iterables.EdgeReaderWrapper;
@@ -35,6 +36,12 @@ import com.facebook.hiveio.record.HiveReadableRecord;
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_DATABASE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_PARTITION;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_PROFILE_ID;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_TABLE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_SPLITS;
+
 /**
  * {@link EdgeInputFormat} for reading edges from Hive.
  *
@@ -51,7 +58,20 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
    */
   public HiveEdgeInputFormat() {
     hiveInputFormat = new HiveApiInputFormat();
-    hiveInputFormat.setMyProfileId(HiveProfiles.EDGE_INPUT_PROFILE_ID);
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+    super.setConf(conf);
+    HiveUtils.initializeHiveInput(
+        hiveInputFormat,
+        HIVE_EDGE_INPUT_PROFILE_ID.get(conf),
+        HIVE_EDGE_INPUT_DATABASE.get(conf),
+        HIVE_EDGE_INPUT_TABLE.get(conf),
+        HIVE_EDGE_INPUT_PARTITION.get(conf),
+        HIVE_EDGE_SPLITS.get(conf),
+        conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index 645846c..55500cb 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -18,7 +18,8 @@
 
 package org.apache.giraph.hive.input.vertex;
 
-import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.iterables.VertexReaderWrapper;
@@ -35,6 +36,12 @@ import com.facebook.hiveio.record.HiveReadableRecord;
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_DATABASE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_PARTITION;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_PROFILE_ID;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_TABLE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_SPLITS;
+
 /**
  * {@link VertexInputFormat} for reading vertices from Hive.
  *
@@ -53,7 +60,20 @@ public class HiveVertexInputFormat<I extends WritableComparable,
    */
   public HiveVertexInputFormat() {
     hiveInputFormat = new HiveApiInputFormat();
-    hiveInputFormat.setMyProfileId(HiveProfiles.VERTEX_INPUT_PROFILE_ID);
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+    super.setConf(conf);
+    HiveUtils.initializeHiveInput(
+        hiveInputFormat,
+        HIVE_VERTEX_INPUT_PROFILE_ID.get(conf),
+        HIVE_VERTEX_INPUT_DATABASE.get(conf),
+        HIVE_VERTEX_INPUT_TABLE.get(conf),
+        HIVE_VERTEX_INPUT_PARTITION.get(conf),
+        HIVE_VERTEX_SPLITS.get(conf),
+        conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
index 5ee1e5d..769c874 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
@@ -18,7 +18,10 @@
 
 package org.apache.giraph.hive.output;
 
-import org.apache.giraph.hive.common.HiveProfiles;
+import java.io.IOException;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
 import org.apache.hadoop.io.Writable;
@@ -31,7 +34,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import com.facebook.hiveio.output.HiveApiOutputFormat;
 import com.facebook.hiveio.record.HiveWritableRecord;
 
-import java.io.IOException;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_TABLE;
 
 /**
  * VertexOutputFormat using Hive
@@ -51,7 +57,19 @@ public class HiveVertexOutputFormat<I extends WritableComparable,
    */
   public HiveVertexOutputFormat() {
     hiveOutputFormat = new HiveApiOutputFormat();
-    hiveOutputFormat.setMyProfileId(HiveProfiles.VERTEX_OUTPUT_PROFILE_ID);
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+    super.setConf(conf);
+    HiveUtils.initializeHiveOutput(
+        hiveOutputFormat,
+        HIVE_VERTEX_OUTPUT_PROFILE_ID.get(conf),
+        HIVE_VERTEX_OUTPUT_DATABASE.get(conf),
+        HIVE_VERTEX_OUTPUT_TABLE.get(conf),
+        HIVE_VERTEX_OUTPUT_PARTITION.get(conf),
+        conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/903302d1/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
index 4677b47..357cf89 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
@@ -36,6 +36,7 @@ import com.facebook.hiveio.schema.HiveTableSchemas;
 
 import java.io.IOException;
 
+import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
 
 /**
  * Vertex writer using Hive.
@@ -47,9 +48,6 @@ import java.io.IOException;
 public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
     E extends Writable>
     extends VertexWriter<I, V, E> implements HiveRecordSaver {
-  /** Key in configuration for VertexToHive class */
-  public static final String VERTEX_TO_HIVE_KEY =
-      "giraph.vertex.to.hive.class";
   /** Logger */
   private static final Logger LOG = Logger.getLogger(HiveVertexWriter.class);
   /** Underlying Hive RecordWriter used */
@@ -111,10 +109,9 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
    * @throws IOException errors instantiating
    */
   private void instantiateVertexToHiveFromConf() throws IOException {
-    Class<? extends VertexToHive> klass =
-        getConf().getClass(VERTEX_TO_HIVE_KEY, null, VertexToHive.class);
+    Class<? extends VertexToHive> klass = VERTEX_TO_HIVE_CLASS.get(getConf());
     if (klass == null) {
-      throw new IOException(VERTEX_TO_HIVE_KEY + " not set in conf");
+      throw new IOException(VERTEX_TO_HIVE_CLASS.getKey() + " not set in conf");
     }
     vertexToHive = ReflectionUtils.newInstance(klass, getConf());
     HiveTableSchemas.configure(vertexToHive, tableSchema);


Mime
View raw message