giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [5/5] git commit: updated refs/heads/trunk to d419f8f
Date Wed, 31 Jul 2013 18:55:50 GMT
GIRAPH-717: HiveJythonRunner with support for pure Jython value types (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d419f8f4
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d419f8f4
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d419f8f4

Branch: refs/heads/trunk
Commit: d419f8f4f84723e1eadce96168414de9f3ce5677
Parents: 8df1027
Author: Nitay Joffe <nitay@apache.org>
Authored: Wed Jul 31 14:54:31 2013 -0400
Committer: Nitay Joffe <nitay@apache.org>
Committed: Wed Jul 31 14:55:17 2013 -0400

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 findbugs-exclude.xml                            |  16 +
 giraph-core/pom.xml                             |   4 +
 .../ByteArrayMessagesPerVertexStore.java        |   2 +-
 .../messages/InMemoryMessageStoreFactory.java   |   2 +-
 .../giraph/comm/messages/MessagesIterable.java  |   2 +-
 .../comm/messages/OneMessagePerVertexStore.java |   2 +-
 .../out_of_core/SequentialFileMessageStore.java |   2 +-
 .../org/apache/giraph/conf/GiraphClasses.java   |  12 +-
 .../apache/giraph/conf/GiraphConfiguration.java |   6 +-
 .../org/apache/giraph/conf/GiraphConstants.java |  11 +
 .../ImmutableClassesGiraphConfiguration.java    |  58 +-
 .../apache/giraph/conf/PerGraphTypeBoolean.java | 160 ++++
 .../conf/PerGraphTypeBooleanConfOption.java     | 114 +++
 .../apache/giraph/conf/PerGraphTypeEnum.java    | 163 ++++
 .../giraph/conf/PerGraphTypeEnumConfOption.java | 133 +++
 .../factories/AbstractMessageValueFactory.java  |   4 +-
 .../factories/DefaultEdgeValueFactory.java      |   6 +-
 .../factories/DefaultVertexIdFactory.java       |   7 +-
 .../factories/DefaultVertexValueFactory.java    |   7 +-
 .../giraph/factories/EdgeValueFactory.java      |   8 +-
 .../giraph/factories/MessageValueFactory.java   |  15 +-
 .../factories/TestMessageValueFactory.java      |   4 +-
 .../apache/giraph/factories/ValueFactory.java   |  54 ++
 .../giraph/factories/ValueFactoryBase.java      |  40 -
 .../giraph/factories/VertexIdFactory.java       |   8 +-
 .../giraph/factories/VertexValueFactory.java    |  10 +-
 .../giraph/graph/AbstractComputation.java       | 287 ++++++
 .../apache/giraph/graph/BasicComputation.java   |   2 +-
 .../org/apache/giraph/graph/Computation.java    | 124 +--
 .../java/org/apache/giraph/graph/GraphType.java | 233 +++++
 .../apache/giraph/jython/JythonComputation.java | 237 +++++
 .../giraph/jython/JythonComputationFactory.java | 112 ---
 .../giraph/jython/JythonGiraphComputation.java  |  99 ++
 .../org/apache/giraph/jython/JythonJob.java     | 408 +++++++++
 .../org/apache/giraph/jython/JythonOptions.java | 104 +++
 .../org/apache/giraph/jython/JythonUtils.java   |  37 +-
 .../factories/JythonComputationFactory.java     | 117 +++
 .../factories/JythonEdgeValueFactory.java       |  41 +
 .../jython/factories/JythonFactoryBase.java     | 134 +++
 .../JythonIncomingMessageValueFactory.java      |  35 +
 .../factories/JythonMessageValueFactory.java    |  40 +
 .../JythonOutgoingMessageValueFactory.java      |  35 +
 .../jython/factories/JythonVertexIdFactory.java |  41 +
 .../factories/JythonVertexValueFactory.java     |  41 +
 .../giraph/jython/factories/package-info.java   |  21 +
 .../jython/wrappers/JythonWrapperBase.java      |  78 ++
 .../jython/wrappers/JythonWritableWrapper.java  |  93 ++
 .../giraph/jython/wrappers/package-info.java    |  21 +
 .../org/apache/giraph/master/MasterCompute.java |   2 +-
 .../apache/giraph/scripting/ScriptLoader.java   |   2 +-
 .../types/BooleanToBooleanWritableWrapper.java  |   4 +-
 .../giraph/types/ByteToByteWritableWrapper.java |   4 +-
 .../giraph/types/ByteToIntWritableWrapper.java  |   4 +-
 .../giraph/types/ByteToLongWritableWrapper.java |   4 +-
 .../types/DoubleToDoubleWritableWrapper.java    |   4 +-
 .../types/FloatToDoubleWritableWrapper.java     |   4 +-
 .../types/FloatToFloatWritableWrapper.java      |   4 +-
 .../giraph/types/IntToIntWritableWrapper.java   |   4 +-
 .../giraph/types/IntToLongWritableWrapper.java  |   4 +-
 .../giraph/types/LongToLongWritableWrapper.java |   4 +-
 .../giraph/types/ShortToIntWritableWrapper.java |   4 +-
 .../types/ShortToLongWritableWrapper.java       |   4 +-
 .../apache/giraph/types/WritableWrapper.java    |   4 +-
 .../giraph/utils/ByteArrayOneToAllMessages.java |   2 +-
 .../giraph/utils/ByteArrayVertexIdMessages.java |   2 +-
 .../apache/giraph/utils/ConfigurationUtils.java |  22 +-
 .../apache/giraph/utils/ReflectionUtils.java    |   6 +-
 .../org/apache/giraph/io/TestEdgeInput.java     |   7 +-
 .../org/apache/giraph/jython/TestJython.java    | 146 ---
 .../apache/giraph/jython/TestJythonBasic.java   |  88 ++
 .../giraph/jython/TestJythonComputation.java    |  82 ++
 .../jython/TestJythonWritableWrapper.java       | 122 +++
 .../master/TestComputationCombinerTypes.java    |   3 +-
 .../apache/giraph/master/TestSwitchClasses.java |  14 +-
 .../giraph/utils/TestReflectionUtils.java       |  11 +-
 .../org/apache/giraph/jython/count-edges.py     |   4 +-
 .../java/org/apache/giraph/TestBspBasic.java    |   2 +-
 .../giraph/vertex/TestComputationTypes.java     |   7 +-
 .../giraph/io/hcatalog/HCatGiraphRunner.java    |   2 +-
 .../giraph/hive/column/HiveReadableColumn.java  | 192 ++++
 .../giraph/hive/column/HiveWritableColumn.java  | 163 ++++
 .../apache/giraph/hive/column/package-info.java |  21 +
 .../giraph/hive/common/GiraphHiveConstants.java |  21 +
 .../apache/giraph/hive/common/HiveUtils.java    |  84 +-
 .../giraph/hive/common/LanguageAndType.java     |  78 ++
 .../hive/input/edge/AbstractHiveToEdge.java     |   7 -
 .../giraph/hive/input/edge/TypedHiveToEdge.java |  97 --
 .../hive/input/vertex/AbstractHiveToVertex.java |   6 -
 .../hive/input/vertex/TypedHiveToVertex.java    |  94 --
 .../giraph/hive/jython/HiveJythonRunner.java    | 126 +++
 .../giraph/hive/jython/HiveJythonUtils.java     | 917 +++++++++++++++++++
 .../giraph/hive/jython/JythonColumnReader.java  |  82 ++
 .../giraph/hive/jython/JythonColumnWriter.java  |  83 ++
 .../apache/giraph/hive/jython/JythonHiveIO.java |  24 +
 .../giraph/hive/jython/JythonHiveReader.java    |  36 +
 .../giraph/hive/jython/JythonHiveToEdge.java    | 107 +++
 .../giraph/hive/jython/JythonHiveToVertex.java  |  99 ++
 .../giraph/hive/jython/JythonHiveWriter.java    |  36 +
 .../hive/jython/JythonReadableColumn.java       | 179 ++++
 .../giraph/hive/jython/JythonVertexToHive.java  |  81 ++
 .../apache/giraph/hive/jython/package-info.java |  22 +
 .../giraph/hive/output/TypedVertexToHive.java   |  77 --
 .../hive/primitives/PrimitiveValueReader.java   |  88 ++
 .../hive/primitives/PrimitiveValueWriter.java   |  88 ++
 .../giraph/hive/primitives/package-info.java    |  21 +
 .../giraph/hive/types/HiveValueReader.java      |  62 --
 .../giraph/hive/types/HiveValueWriter.java      |  59 --
 .../giraph/hive/types/HiveVertexIdReader.java   |  64 --
 .../giraph/hive/types/HiveVertexIdWriter.java   |  60 --
 .../giraph/hive/types/TypedValueReader.java     |  92 --
 .../giraph/hive/types/TypedValueWriter.java     |  84 --
 .../giraph/hive/types/TypedVertexIdReader.java  |  93 --
 .../giraph/hive/types/TypedVertexIdWriter.java  |  84 --
 .../apache/giraph/hive/types/package-info.java  |  21 -
 .../giraph/hive/values/HiveValueReader.java     |  59 ++
 .../giraph/hive/values/HiveValueWriter.java     |  59 ++
 .../apache/giraph/hive/values/package-info.java |  22 +
 .../apache/giraph/hive/GiraphHiveTestBase.java  |   4 +-
 .../java/org/apache/giraph/hive/Helpers.java    |   7 +-
 .../giraph/hive/input/HiveEdgeInputTest.java    |   1 -
 .../giraph/hive/input/HiveVertexInputTest.java  |   1 -
 .../hive/jython/TestHiveJythonComplexTypes.java | 139 +++
 .../hive/jython/TestHiveJythonPrimitives.java   | 110 +++
 .../hive/jython/TestJythonLabelInfluence.java   |   0
 .../giraph/hive/output/HiveOutputTest.java      |   1 -
 .../giraph/jython/count-edges-launcher.py       |  43 +
 .../jython/fake-label-propagation-launcher.py   |  52 ++
 .../jython/fake-label-propagation-worker.py     |  91 ++
 pom.xml                                         |   8 +-
 130 files changed, 6371 insertions(+), 1442 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index bc3e84d..a17606e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-717: HiveJythonRunner with support for pure Jython value types (nitay)
+
   GIRAPH-722: ProgressableUtils.waitForever is not calling progress (majakabiljo)
 
   GIRAPH-549: Tinkerpop/Blueprints/Rexter InputFormat (armax00 via claudio)

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index 2f7f400..6cc5f7a 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -60,4 +60,20 @@
   <Match>
     <Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
   </Match>
+  <Match>
+    <Class name="org.apache.giraph.jython.wrappers.JythonWrapperBase"/>
+    <Bug pattern="EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC"/>
+  </Match>
+  <Match>
+    <Class name="org.apache.giraph.jython.JythonWritableWrapper"/>
+    <Bug pattern="EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC"/>
+  </Match>
+  <Match>
+    <Class name="org.apache.giraph.hive.column.HiveColumnWriter"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
+  <Match>
+    <Class name="org.apache.giraph.hive.record.HiveRecordWriter"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index cab0157..b2dcbd1 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -457,6 +457,10 @@ under the License.
       <artifactId>commons-io</artifactId>
     </dependency>
     <dependency>
+      <groupId>io.airlift</groupId>
+      <artifactId>airline</artifactId>
+    </dependency>
+    <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index ba6d020..6518da6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -156,7 +156,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
 
     @Override
     protected M createWritable() {
-      return messageValueFactory.createMessageValue();
+      return messageValueFactory.newInstance();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 40674e1..0cdfb73 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -68,7 +68,7 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
   @Override
   public MessageStore<I, M> newStore(
       MessageValueFactory<M> messageValueFactory) {
-    Class<M> messageClass = messageValueFactory.getMessageValueClass();
+    Class<M> messageClass = messageValueFactory.getValueClass();
     MessageStore messageStore;
     if (conf.useCombiner()) {
       Class<I> vertexIdClass = conf.getVertexIdClass();

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
index f3f327f..a466a8d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
@@ -52,6 +52,6 @@ public class MessagesIterable<M extends Writable>
 
   @Override
   protected M createWritable() {
-    return messageValueFactory.createMessageValue();
+    return messageValueFactory.newInstance();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 29260df..4f150da 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -107,7 +107,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
 
   @Override
   protected M readFieldsForMessages(DataInput in) throws IOException {
-    M message = messageValueFactory.createMessageValue();
+    M message = messageValueFactory.newInstance();
     message.readFields(in);
     return message;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
index 1ff0e18..89520ff 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
@@ -298,7 +298,7 @@ public class SequentialFileMessageStore<I extends WritableComparable,
     int messagesSize = in.readInt();
     List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
     for (int i = 0; i < messagesSize; i++) {
-      M message = messageValueFactory.createMessageValue();
+      M message = messageValueFactory.newInstance();
       try {
         message.readFields(in);
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 6655834..71fe885 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -62,8 +62,8 @@ public class GiraphClasses<I extends WritableComparable,
       ? extends Writable, ? extends Writable>>
   computationFactoryClass;
   /** Computation class - cached for fast access */
-  protected Class<? extends
-      Computation<I, V, E, ? extends Writable, ? extends Writable>>
+  protected Class<? extends Computation<I, V, E,
+      ? extends Writable, ? extends Writable>>
   computationClass;
   /** Generic types used to describe graph */
   protected GiraphTypes<I, V, E> giraphTypes;
@@ -149,8 +149,8 @@ public class GiraphClasses<I extends WritableComparable,
             ? extends Writable, ? extends Writable>>)
             COMPUTATION_FACTORY_CLASS.get(conf);
     computationClass =
-        (Class<? extends
-            Computation<I, V, E, ? extends Writable, ? extends Writable>>)
+        (Class<? extends Computation<I, V, E,
+            ? extends Writable, ? extends Writable>>)
             COMPUTATION_CLASS.get(conf);
 
     outEdgesClass = (Class<? extends OutEdges<I, E>>)
@@ -195,8 +195,8 @@ public class GiraphClasses<I extends WritableComparable,
    *
    * @return Computation class.
    */
-  public Class<? extends
-      Computation<I, V, E, ? extends Writable, ? extends Writable>>
+  public Class<? extends Computation<I, V, E,
+      ? extends Writable, ? extends Writable>>
   getComputationClass() {
     return computationClass;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index de0539b..23bcd32 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -22,10 +22,10 @@ import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReuseObjectsOutEdges;
-import org.apache.giraph.graph.Computation;
 import org.apache.giraph.factories.ComputationFactory;
-import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.factories.VertexValueFactory;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
@@ -95,7 +95,7 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Get the user's subclassed {@link Computation}
+   * Get the user's subclassed {@link org.apache.giraph.graph.Computation}
    *
    * @return User's computation class
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index f6233e3..c276c2a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -86,6 +86,17 @@ public interface GiraphConstants {
           TypesHolder.class,
           "TypesHolder, used if Computation not set - optional");
 
+  /** Language user's graph types are implemented in */
+  PerGraphTypeEnumConfOption<Language> GRAPH_TYPE_LANGUAGES =
+      PerGraphTypeEnumConfOption.create("giraph.types.language",
+          Language.class, Language.JAVA,
+          "Language user graph types (IVEMM) are implemented in");
+
+  /** Whether user graph types need wrappers */
+  PerGraphTypeBooleanConfOption GRAPH_TYPES_NEEDS_WRAPPERS =
+      new PerGraphTypeBooleanConfOption("giraph.jython.type.wrappers",
+          false, "Whether user graph types (IVEMM) need Jython wrappers");
+
   /** Vertex id factory class - optional */
   ClassConfOption<VertexIdFactory> VERTEX_ID_FACTORY_CLASS =
       ClassConfOption.create("giraph.vertexIdFactoryClass",

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index db24423..49a2ebc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -25,11 +25,14 @@ import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReusableEdge;
 import org.apache.giraph.factories.ComputationFactory;
+import org.apache.giraph.factories.EdgeValueFactory;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.factories.ValueFactories;
+import org.apache.giraph.factories.VertexIdFactory;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.DefaultVertex;
+import org.apache.giraph.graph.Language;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
@@ -83,8 +86,12 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     extends GiraphConfiguration {
   /** Holder for all the classes */
   private final GiraphClasses classes;
-  /** Value Factories */
+  /** Value (IVEMM) Factories */
   private final ValueFactories<I, V, E> valueFactories;
+  /** Language values (IVEMM) are implemented in */
+  private final PerGraphTypeEnum<Language> valueLanguages;
+  /** Whether values (IVEMM) need Jython wrappers */
+  private final PerGraphTypeBoolean valueNeedsWrappers;
 
   /**
    * Use unsafe serialization? Cached for fast access to instantiate the
@@ -102,6 +109,10 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     super(conf);
     classes = new GiraphClasses<I, V, E>(conf);
     useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
+    valueLanguages = PerGraphTypeEnum.readFromConf(
+        GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
+    valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
+        GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf);
     valueFactories = new ValueFactories<I, V, E>(conf);
     valueFactories.initializeIVE(this);
   }
@@ -117,6 +128,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     }
   }
 
+  public PerGraphTypeBoolean getValueNeedsWrappers() {
+    return valueNeedsWrappers;
+  }
+
+  public PerGraphTypeEnum<Language> getValueLanguages() {
+    return valueLanguages;
+  }
+
   /**
    * Get the vertex input filter class
    *
@@ -440,8 +459,8 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   @Override
-  public Class<? extends
-      Computation<I, V, E, ? extends Writable, ? extends Writable>>
+  public Class<? extends Computation<I, V, E,
+      ? extends Writable, ? extends Writable>>
   getComputationClass() {
     return classes.getComputationClass();
   }
@@ -508,12 +527,21 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get vertex ID factory
+   *
+   * @return {@link VertexIdFactory}
+   */
+  public VertexIdFactory<I> getVertexIdFactory() {
+    return valueFactories.getVertexIdFactory();
+  }
+
+  /**
    * Create a user vertex index
    *
    * @return Instantiated user vertex index
    */
   public I createVertexId() {
-    return valueFactories.getVertexIdFactory().createVertexId();
+    return getVertexIdFactory().newInstance();
   }
 
   /**
@@ -526,13 +554,22 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get vertex value factory
+   *
+   * @return {@link VertexValueFactory}
+   */
+  public VertexValueFactory<V> getVertexValueFactory() {
+    return valueFactories.getVertexValueFactory();
+  }
+
+  /**
    * Create a user vertex value
    *
    * @return Instantiated user vertex value
    */
   @SuppressWarnings("unchecked")
   public V createVertexValue() {
-    return valueFactories.getVertexValueFactory().createVertexValue();
+    return getVertexValueFactory().newInstance();
   }
 
   /**
@@ -601,12 +638,21 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get Factory for creating edge values
+   *
+   * @return {@link EdgeValueFactory}
+   */
+  public EdgeValueFactory<E> getEdgeValueFactory() {
+    return valueFactories.getEdgeValueFactory();
+  }
+
+  /**
    * Create a user edge value
    *
    * @return Instantiated user edge value
    */
   public E createEdgeValue() {
-    return valueFactories.getEdgeValueFactory().createEdgeValue();
+    return getEdgeValueFactory().newInstance();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
new file mode 100644
index 0000000..375d054
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.giraph.graph.GraphType;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A boolean stored per user graph type
+ */
+public class PerGraphTypeBoolean {
+  /** data for vertex id */
+  private boolean vertexId;
+  /** data for vertex value */
+  private boolean vertexValue;
+  /** data for edge value */
+  private boolean edgeValue;
+  /** data for incoming message */
+  private boolean incomingMessage;
+  /** data for outgoing message */
+  private boolean outgoingMessage;
+
+  /**
+   * Create from options and configuration
+   *
+   * @param options pre user graph type options
+   * @param conf configuration
+   * @return new object
+   */
+  public static PerGraphTypeBoolean readFromConf(
+      PerGraphTypeBooleanConfOption options, Configuration conf) {
+    PerGraphTypeBoolean pgtb = new PerGraphTypeBoolean();
+    pgtb.setFrom(options, conf);
+    return pgtb;
+  }
+
+  /**
+   * Set data from per user graph type set of options
+   *
+   * @param options per user graph type options
+   * @param conf Configuration
+   */
+  public void setFrom(PerGraphTypeBooleanConfOption options,
+      Configuration conf) {
+    setVertexId(options.getVertexId(), conf);
+    setVertexValue(options.getVertexValue(), conf);
+    setEdgeValue(options.getEdgeValue(), conf);
+    setIncomingMessage(options.getIncomingMessage(), conf);
+    setOutgoingMessage(options.getOutgoingMessage(), conf);
+  }
+
+  /**
+   * Set the vertex id data from the option
+   *
+   * @param option EnumConfOption option to use
+   * @param conf Configuration
+   */
+  public void setVertexId(BooleanConfOption option, Configuration conf) {
+    vertexId = option.get(conf);
+  }
+
+  /**
+   * Set the vertex value data from the option
+   *
+   * @param option EnumConfOption option to use
+   * @param conf Configuration
+   */
+  public void setVertexValue(BooleanConfOption option, Configuration conf) {
+    vertexValue = option.get(conf);
+  }
+
+  /**
+   * Set the edge value data from the option
+   *
+   * @param option EnumConfOption option to use
+   * @param conf Configuration
+   */
+  public void setEdgeValue(BooleanConfOption option, Configuration conf) {
+    edgeValue = option.get(conf);
+  }
+
+  /**
+   * Set the incoming message value data from the option
+   *
+   * @param option EnumConfOption option to use
+   * @param conf Configuration
+   */
+  public void setIncomingMessage(BooleanConfOption option, Configuration conf) {
+    incomingMessage = option.get(conf);
+  }
+
+  /**
+   * Set the outgoing message value data from the option
+   *
+   * @param option EnumConfOption option to use
+   * @param conf Configuration
+   */
+  public void setOutgoingMessage(BooleanConfOption option, Configuration conf) {
+    outgoingMessage = option.get(conf);
+  }
+
+  /**
+   * Get data for given GraphType
+   *
+   * @param graphType GraphType
+   * @return data for given graph type
+   */
+  public boolean get(GraphType graphType) {
+    switch (graphType) {
+    case VERTEX_ID:
+      return vertexId;
+    case VERTEX_VALUE:
+      return vertexValue;
+    case EDGE_VALUE:
+      return edgeValue;
+    case INCOMING_MESSAGE_VALUE:
+      return incomingMessage;
+    case OUTGOING_MESSAGE_VALUE:
+      return outgoingMessage;
+    default:
+      throw new IllegalArgumentException(
+          "Don't know how to handle GraphType " + graphType);
+    }
+  }
+
+  public boolean getEdgeValue() {
+    return edgeValue;
+  }
+
+  public boolean getIncomingMessage() {
+    return incomingMessage;
+  }
+
+  public boolean getOutgoingMessage() {
+    return outgoingMessage;
+  }
+
+  public boolean getVertexId() {
+    return vertexId;
+  }
+
+  public boolean getVertexValue() {
+    return vertexValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
new file mode 100644
index 0000000..adfa979
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.giraph.graph.GraphType;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Boolean Configuration option per user graph type (IVEMM)
+ */
+public class PerGraphTypeBooleanConfOption {
+  /** option for vertex id */
+  private final BooleanConfOption vertexId;
+  /** option for vertex value */
+  private final BooleanConfOption vertexValue;
+  /** option for edge value */
+  private final BooleanConfOption edgeValue;
+  /** option for incoming message */
+  private final BooleanConfOption incomingMessage;
+  /** option for outgoing message */
+  private final BooleanConfOption outgoingMessage;
+
+  /**
+   * Constructor
+   *
+   * @param keyPrefix Configuration key prefix
+   * @param defaultValue default value
+   * @param description description of the option
+   */
+  public PerGraphTypeBooleanConfOption(String keyPrefix,
+      boolean defaultValue, String description) {
+    vertexId = new BooleanConfOption(keyPrefix + ".vertex.id",
+        defaultValue, description);
+    vertexValue = new BooleanConfOption(keyPrefix + ".vertex.value",
+        defaultValue, description);
+    edgeValue = new BooleanConfOption(keyPrefix + ".edge.value",
+        defaultValue, description);
+    incomingMessage = new BooleanConfOption(keyPrefix + ".incoming.message",
+        defaultValue, description);
+    outgoingMessage = new BooleanConfOption(keyPrefix + ".outgoing.message",
+        defaultValue, description);
+  }
+
+  /**
+   * Get option for given GraphType
+   *
+   * @param graphType GraphType
+   * @return BooleanConfOption for given graph type
+   */
+  public BooleanConfOption get(GraphType graphType) {
+    switch (graphType) {
+    case VERTEX_ID:
+      return vertexId;
+    case VERTEX_VALUE:
+      return vertexValue;
+    case EDGE_VALUE:
+      return edgeValue;
+    case INCOMING_MESSAGE_VALUE:
+      return incomingMessage;
+    case OUTGOING_MESSAGE_VALUE:
+      return outgoingMessage;
+    default:
+      throw new IllegalArgumentException(
+          "Don't know how to handle GraphType " + graphType);
+    }
+  }
+
+  /**
+   * Set value for given GraphType
+   *
+   * @param conf Configuration
+   * @param graphType GraphType
+   * @param value data
+   */
+  public void set(Configuration conf, GraphType graphType, boolean value) {
+    get(graphType).set(conf, value);
+  }
+
+  public BooleanConfOption getEdgeValue() {
+    return edgeValue;
+  }
+
+  public BooleanConfOption getIncomingMessage() {
+    return incomingMessage;
+  }
+
+  public BooleanConfOption getOutgoingMessage() {
+    return outgoingMessage;
+  }
+
+  public BooleanConfOption getVertexId() {
+    return vertexId;
+  }
+
+  public BooleanConfOption getVertexValue() {
+    return vertexValue;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
new file mode 100644
index 0000000..7003709
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.giraph.graph.GraphType;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An enum stored per user graph type
+ *
+ * @param <T> Enum type
+ */
+public class PerGraphTypeEnum<T extends Enum<T>> {
+  /** data for vertex id */
+  private T vertexId;
+  /** data for vertex value */
+  private T vertexValue;
+  /** data for edge value */
+  private T edgeValue;
+  /** data for incoming message */
+  private T incomingMessage;
+  /** data for outgoing message */
+  private T outgoingMessage;
+
+  /**
+   * Create from options and configuration
+   *
+   * @param <T> enum type
+   * @param options pre user graph type options
+   * @param conf configuration
+   * @return new object
+   */
+  public static <T extends Enum<T>> PerGraphTypeEnum<T> readFromConf(
+      PerGraphTypeEnumConfOption<T> options, Configuration conf) {
+    PerGraphTypeEnum<T> pgte = new PerGraphTypeEnum<T>();
+    pgte.setFrom(options, conf);
+    return pgte;
+  }
+
+  /**
+   * Set data from per user graph type set of options
+   *
+   * @param options per user graph type options
+   * @param conf Configuration
+   */
+  public void setFrom(PerGraphTypeEnumConfOption<T> options,
+      Configuration conf) {
+    setVertexId(options.getVertexId(), conf);
+    setVertexValue(options.getVertexValue(), conf);
+    setEdgeValue(options.getEdgeValue(), conf);
+    setIncomingMessage(options.getIncomingMessage(), conf);
+    setOutgoingMessage(options.getOutgoingMessage(), conf);
+  }
+
+  /**
+   * Set the vertex id data from the option
+   *
+   * @param option EnumConfOption option to use
+   * @param conf Configuration
+   */
+  public void setVertexId(EnumConfOption<T> option, Configuration conf) {
+    vertexId = option.get(conf);
+  }
+
+  /**
+   * Set the vertex value data from the option
+   *
+   * @param option EnumConfOption option to use
+   * @param conf Configuration
+   */
+  public void setVertexValue(EnumConfOption<T> option, Configuration conf) {
+    vertexValue = option.get(conf);
+  }
+
+  /**
+   * Set the edge value data from the option
+   *
+   * @param option EnumConfOption option to use
+   * @param conf Configuration
+   */
+  public void setEdgeValue(EnumConfOption<T> option, Configuration conf) {
+    edgeValue = option.get(conf);
+  }
+
+  /**
+   * Set the incoming message value data from the option
+   *
+   * @param option EnumConfOption option to use
+   * @param conf Configuration
+   */
+  public void setIncomingMessage(EnumConfOption<T> option, Configuration conf) {
+    incomingMessage = option.get(conf);
+  }
+
+  /**
+   * Set the outgoing message value data from the option
+   *
+   * @param option EnumConfOption option to use
+   * @param conf Configuration
+   */
+  public void setOutgoingMessage(EnumConfOption<T> option, Configuration conf) {
+    outgoingMessage = option.get(conf);
+  }
+
+  /**
+   * Get data for given GraphType
+   *
+   * @param graphType GraphType
+   * @return data for given graph type
+   */
+  public T get(GraphType graphType) {
+    switch (graphType) {
+    case VERTEX_ID:
+      return vertexId;
+    case VERTEX_VALUE:
+      return vertexValue;
+    case EDGE_VALUE:
+      return edgeValue;
+    case INCOMING_MESSAGE_VALUE:
+      return incomingMessage;
+    case OUTGOING_MESSAGE_VALUE:
+      return outgoingMessage;
+    default:
+      throw new IllegalArgumentException(
+          "Don't know how to handle GraphType " + graphType);
+    }
+  }
+
+  public T getEdgeValue() {
+    return edgeValue;
+  }
+
+  public T getIncomingMessage() {
+    return incomingMessage;
+  }
+
+  public T getOutgoingMessage() {
+    return outgoingMessage;
+  }
+
+  public T getVertexId() {
+    return vertexId;
+  }
+
+  public T getVertexValue() {
+    return vertexValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
new file mode 100644
index 0000000..8ae4576
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.giraph.graph.GraphType;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Enum Configuration option per user graph type (IVEMM)
+ *
+ * @param <T> Enum class
+ */
+public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
+  /** option for vertex id */
+  private final EnumConfOption<T> vertexId;
+  /** option for vertex value */
+  private final EnumConfOption<T> vertexValue;
+  /** option for edge value */
+  private final EnumConfOption<T> edgeValue;
+  /** option for incoming message */
+  private final EnumConfOption<T> incomingMessage;
+  /** option for outgoing message */
+  private final EnumConfOption<T> outgoingMessage;
+
+  /**
+   * Constructor
+   *
+   * @param keyPrefix Configuration key prefix
+   * @param klass Enum class
+   * @param defaultValue default value
+   * @param description description of the option
+   */
+  public PerGraphTypeEnumConfOption(String keyPrefix, Class<T> klass,
+      T defaultValue, String description) {
+    vertexId = EnumConfOption.create(keyPrefix + ".vertex.id", klass,
+        defaultValue, description);
+    vertexValue = EnumConfOption.create(keyPrefix + ".vertex.value", klass,
+        defaultValue, description);
+    edgeValue = EnumConfOption.create(keyPrefix + ".edge.value",
+        klass, defaultValue, description);
+    incomingMessage = EnumConfOption.create(keyPrefix + ".incoming.message",
+        klass, defaultValue, description);
+    outgoingMessage = EnumConfOption.create(keyPrefix + ".outgoing.message",
+        klass, defaultValue, description);
+  }
+
+  /**
+   * Create new EnumGraphTypeConfOption
+   *
+   * @param keyPrefix String configuration key prefix
+   * @param klass enum class
+   * @param defaultValue default enum value
+   * @param description description of the option
+   * @param <X> enum type
+   * @return EnumConfOption
+   */
+  public static <X extends Enum<X>> PerGraphTypeEnumConfOption<X>
+  create(String keyPrefix, Class<X> klass, X defaultValue, String description) {
+    return new PerGraphTypeEnumConfOption<X>(keyPrefix, klass,
+        defaultValue, description);
+  }
+
+  /**
+   * Get option for given GraphType
+   *
+   * @param graphType GraphType
+   * @return EnumConfOption for given graph type
+   */
+  public EnumConfOption<T> get(GraphType graphType) {
+    switch (graphType) {
+    case VERTEX_ID:
+      return vertexId;
+    case VERTEX_VALUE:
+      return vertexValue;
+    case EDGE_VALUE:
+      return edgeValue;
+    case INCOMING_MESSAGE_VALUE:
+      return incomingMessage;
+    case OUTGOING_MESSAGE_VALUE:
+      return outgoingMessage;
+    default:
+      throw new IllegalArgumentException(
+          "Don't know how to handle GraphType " + graphType);
+    }
+  }
+
+  /**
+   * Set value for given GraphType
+   *
+   * @param conf Configuration
+   * @param graphType GraphType
+   * @param language Language
+   */
+  public void set(Configuration conf, GraphType graphType, T language) {
+    get(graphType).set(conf, language);
+  }
+
+  public EnumConfOption<T> getEdgeValue() {
+    return edgeValue;
+  }
+
+  public EnumConfOption<T> getIncomingMessage() {
+    return incomingMessage;
+  }
+
+  public EnumConfOption<T> getOutgoingMessage() {
+    return outgoingMessage;
+  }
+
+  public EnumConfOption<T> getVertexId() {
+    return vertexId;
+  }
+
+  public EnumConfOption<T> getVertexValue() {
+    return vertexValue;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
index f426d5f..dcc286f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
@@ -43,7 +43,7 @@ public abstract class AbstractMessageValueFactory<M extends Writable>
       ImmutableClassesGiraphConfiguration conf);
 
   @Override
-  public Class<M> getMessageValueClass() {
+  public Class<M> getValueClass() {
     return messageValueClass;
   }
 
@@ -52,7 +52,7 @@ public abstract class AbstractMessageValueFactory<M extends Writable>
     messageValueClass = extractMessageValueClass(conf);
   }
 
-  @Override public M createMessageValue() {
+  @Override public M newInstance() {
     return WritableUtils.createWritable(messageValueClass);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
index 55d27e7..c392c87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
@@ -37,7 +37,11 @@ public class DefaultEdgeValueFactory<E extends Writable>
     edgeValueClass = conf.getEdgeValueClass();
   }
 
-  @Override public E createEdgeValue() {
+  @Override public Class<E> getValueClass() {
+    return edgeValueClass;
+  }
+
+  @Override public E newInstance() {
     return WritableUtils.createWritable(edgeValueClass);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
index 4977b44..6dafc7c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
@@ -38,7 +38,12 @@ public class DefaultVertexIdFactory<I extends WritableComparable>
   }
 
   @Override
-  public I createVertexId() {
+  public Class<I> getValueClass() {
+    return vertexIdClass;
+  }
+
+  @Override
+  public I newInstance() {
     return WritableUtils.createWritable(vertexIdClass);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
index caf312f..2fe61ae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
@@ -39,7 +39,12 @@ public class DefaultVertexValueFactory<V extends Writable>
   }
 
   @Override
-  public V createVertexValue() {
+  public Class<V> getValueClass() {
+    return vertexValueClass;
+  }
+
+  @Override
+  public V newInstance() {
     return WritableUtils.createWritable(vertexValueClass);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/EdgeValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/EdgeValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/EdgeValueFactory.java
index 751c6e6..9c5bb07 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/EdgeValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/EdgeValueFactory.java
@@ -27,11 +27,5 @@ import org.apache.hadoop.io.Writable;
  * @param <E> Edge value
  */
 public interface EdgeValueFactory<E extends Writable>
-    extends ValueFactoryBase<E> {
-  /**
-   * Create a new edge value.
-   *
-   * @return new edge value.
-   */
-  E createEdgeValue();
+    extends ValueFactory<E> {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/MessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/MessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/MessageValueFactory.java
index a95e1a2..e94745b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/MessageValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/MessageValueFactory.java
@@ -27,18 +27,5 @@ import org.apache.hadoop.io.Writable;
  * @param <M> Message value
  */
 public interface MessageValueFactory<M extends Writable>
-    extends ValueFactoryBase<M> {
-  /**
-   * Get the java Class representing messages this factory creates
-   *
-   * @return Class<M>
-   */
-  Class<M> getMessageValueClass();
-
-  /**
-   * Create a new message value.
-   *
-   * @return new message value.
-   */
-  M createMessageValue();
+    extends ValueFactory<M> {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
index 3d376de..806664b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
@@ -40,14 +40,14 @@ public class TestMessageValueFactory<M extends Writable>
     this.klass = klass;
   }
 
-  @Override public Class<M> getMessageValueClass() {
+  @Override public Class<M> getValueClass() {
     return klass;
   }
 
   @Override public void initialize(
       ImmutableClassesGiraphConfiguration conf) { }
 
-  @Override public M createMessageValue() {
+  @Override public M newInstance() {
     return ReflectionUtils.newInstance(klass);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
new file mode 100644
index 0000000..5725061
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.factories;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Base interface for factories creating user graph value types (IVEMM)
+ *
+ * @param <W> Writable type
+ */
+public interface ValueFactory<W extends Writable> {
+  /**
+   * Initialize factory settings from the conf.
+   * This gets called on startup and also if there are changes to the message
+   * classes used. For example if the user's
+   * {@link org.apache.giraph.master.MasterCompute} changes the
+   * {@link org.apache.giraph.graph.Computation} and the next superstep has a
+   * different message value type.
+   *
+   * @param conf Configuration
+   */
+  void initialize(ImmutableClassesGiraphConfiguration conf);
+
+  /**
+   * Create a new value.
+   *
+   * @return new value.
+   */
+  W newInstance();
+
+  /**
+   * Get the java Class representing messages this factory creates
+   *
+   * @return Class<M>
+   */
+  Class<W> getValueClass();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactoryBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactoryBase.java b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactoryBase.java
deleted file mode 100644
index f92fb74..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactoryBase.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.factories;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Base interface for factories creating user graph value types (IVEMM)
- *
- * @param <W> Writable type
- */
-public interface ValueFactoryBase<W extends Writable> {
-  /**
-   * Initialize factory settings from the conf.
-   * This gets called on startup and also if there are changes to the message
-   * classes used. For example if the user's
-   * {@link org.apache.giraph.master.MasterCompute} changes the
-   * {@link org.apache.giraph.graph.Computation} and the next superstep has a
-   * different message value type.
-   *
-   * @param conf Configuration
-   */
-  void initialize(ImmutableClassesGiraphConfiguration conf);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/VertexIdFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/VertexIdFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/VertexIdFactory.java
index 328bda8..a119dc4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/VertexIdFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/VertexIdFactory.java
@@ -27,12 +27,6 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex ID
  */
 public interface VertexIdFactory<I extends WritableComparable>
-    extends ValueFactoryBase<I> {
-  /**
-   * Create a new edge value.
-   *
-   * @return new edge value.
-   */
-  I createVertexId();
+    extends ValueFactory<I> {
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/factories/VertexValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/VertexValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/VertexValueFactory.java
index d2f62ab..ba12b84 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/VertexValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/VertexValueFactory.java
@@ -29,13 +29,5 @@ import org.apache.hadoop.io.Writable;
  * @param <V> Vertex value
  */
 public interface VertexValueFactory<V extends Writable>
-    extends ValueFactoryBase<V> {
-  /**
-   * Create a new vertex value.
-   *
-   * @return new vertex value.
-   */
-  V createVertexValue();
+    extends ValueFactory<V> {
 }
-
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
new file mode 100644
index 0000000..e7c3084
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * See {@link Computation} for explanation of the interface.
+ *
+ * This is a abstract class helper for users to implement their computations.
+ * It implements all of the methods required by the {@link Computation
+ * interface except for the {@link #compute(Vertex, Iterable)} which we leave
+ * to the user to define.
+ *
+ * In most cases users should inherit from this class when implementing their
+ * algorithms with Giraph.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M1> Incoming message type
+ * @param <M2> Outgoing message type
+ */
+public abstract class AbstractComputation<I extends WritableComparable,
+    V extends Writable, E extends Writable, M1 extends Writable,
+    M2 extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+    implements Computation<I, V, E, M1, M2> {
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(AbstractComputation.class);
+
+  /** Global graph state **/
+  private GraphState graphState;
+  /** Handles requests */
+  private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
+  /** Graph-wide BSP Mapper for this Computation */
+  private GraphTaskManager<I, V, E> graphTaskManager;
+  /** Worker aggregator usage */
+  private WorkerAggregatorUsage workerAggregatorUsage;
+  /** Worker context */
+  private WorkerContext workerContext;
+
+  /**
+   * Must be defined by user to do computation on a single Vertex.
+   *
+   * @param vertex   Vertex
+   * @param messages Messages that were sent to this vertex in the previous
+   *                 superstep.  Each message is only guaranteed to have
+   *                 a life expectancy as long as next() is not called.
+   */
+  public abstract void compute(Vertex<I, V, E> vertex,
+      Iterable<M1> messages) throws IOException;
+
+  /**
+   * Prepare for computation. This method is executed exactly once prior to
+   * {@link #compute(Vertex, Iterable)} being called for any of the vertices
+   * in the partition.
+   */
+  @Override
+  public void preSuperstep() {
+  }
+
+  /**
+   * Finish computation. This method is executed exactly once after computation
+   * for all vertices in the partition is complete.
+   */
+  @Override
+  public void postSuperstep() {
+  }
+
+  /**
+   * Initialize, called by infrastructure before the superstep starts.
+   * Shouldn't be called by user code.
+   *
+   * @param graphState Graph state
+   * @param workerClientRequestProcessor Processor for handling requests
+   * @param graphTaskManager Graph-wide BSP Mapper for this Vertex
+   * @param workerAggregatorUsage Worker aggregator usage
+   * @param workerContext Worker context
+   */
+  @Override
+  public void initialize(
+      GraphState graphState,
+      WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
+      GraphTaskManager<I, V, E> graphTaskManager,
+      WorkerAggregatorUsage workerAggregatorUsage,
+      WorkerContext workerContext) {
+    this.graphState = graphState;
+    this.workerClientRequestProcessor = workerClientRequestProcessor;
+    this.graphTaskManager = graphTaskManager;
+    this.workerAggregatorUsage = workerAggregatorUsage;
+    this.workerContext = workerContext;
+  }
+
+  /**
+   * Retrieves the current superstep.
+   *
+   * @return Current superstep
+   */
+  @Override
+  public long getSuperstep() {
+    return graphState.getSuperstep();
+  }
+
+  /**
+   * Get the total (all workers) number of vertices that
+   * existed in the previous superstep.
+   *
+   * @return Total number of vertices (-1 if first superstep)
+   */
+  @Override
+  public long getTotalNumVertices() {
+    return graphState.getTotalNumVertices();
+  }
+
+  /**
+   * Get the total (all workers) number of edges that
+   * existed in the previous superstep.
+   *
+   * @return Total number of edges (-1 if first superstep)
+   */
+  @Override
+  public long getTotalNumEdges() {
+    return graphState.getTotalNumEdges();
+  }
+
+  /**
+   * Send a message to a vertex id.
+   *
+   * @param id Vertex id to send the message to
+   * @param message Message data to send
+   */
+  @Override
+  public void sendMessage(I id, M2 message) {
+    workerClientRequestProcessor.sendMessageRequest(id, message);
+  }
+
+  /**
+   * Send a message to all edges.
+   *
+   * @param vertex Vertex whose edges to send the message to.
+   * @param message Message sent to all edges.
+   */
+  @Override
+  public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
+    workerClientRequestProcessor.sendMessageToAllRequest(vertex, message);
+  }
+
+  /**
+   * Send a message to multiple target vertex ids in the iterator.
+   *
+   * @param vertexIdIterator An iterator to multiple target vertex ids.
+   * @param message Message sent to all targets in the iterator.
+   */
+  @Override
+  public void sendMessageToMultipleEdges(
+      Iterator<I> vertexIdIterator, M2 message) {
+    workerClientRequestProcessor.sendMessageToAllRequest(
+        vertexIdIterator, message);
+  }
+
+  /**
+   * Sends a request to create a vertex that will be available during the
+   * next superstep.
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   * @param edges Initial edges
+   */
+  @Override
+  public void addVertexRequest(I id, V value,
+      OutEdges<I, E> edges) throws IOException {
+    Vertex<I, V, E> vertex = getConf().createVertex();
+    vertex.initialize(id, value, edges);
+    workerClientRequestProcessor.addVertexRequest(vertex);
+  }
+
+  /**
+   * Sends a request to create a vertex that will be available during the
+   * next superstep.
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   */
+  @Override
+  public void addVertexRequest(I id, V value) throws IOException {
+    addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
+  }
+
+  /**
+   * Request to remove a vertex from the graph
+   * (applied just prior to the next superstep).
+   *
+   * @param vertexId Id of the vertex to be removed.
+   */
+  @Override
+  public void removeVertexRequest(I vertexId) throws IOException {
+    workerClientRequestProcessor.removeVertexRequest(vertexId);
+  }
+
+  /**
+   * Request to add an edge of a vertex in the graph
+   * (processed just prior to the next superstep)
+   *
+   * @param sourceVertexId Source vertex id of edge
+   * @param edge Edge to add
+   */
+  @Override
+  public void addEdgeRequest(I sourceVertexId,
+      Edge<I, E> edge) throws IOException {
+    workerClientRequestProcessor.addEdgeRequest(sourceVertexId, edge);
+  }
+
+  /**
+   * Request to remove all edges from a given source vertex to a given target
+   * vertex (processed just prior to the next superstep).
+   *
+   * @param sourceVertexId Source vertex id
+   * @param targetVertexId Target vertex id
+   */
+  @Override
+  public void removeEdgesRequest(I sourceVertexId,
+      I targetVertexId) throws IOException {
+    workerClientRequestProcessor.removeEdgesRequest(
+        sourceVertexId, targetVertexId);
+  }
+
+  /**
+   * Get the mapper context
+   *
+   * @return Mapper context
+   */
+  @Override
+  public Mapper.Context getContext() {
+    return graphState.getContext();
+  }
+
+  /**
+   * Get the worker context
+   *
+   * @param <W> WorkerContext class
+   * @return WorkerContext context
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public <W extends WorkerContext> W getWorkerContext() {
+    return (W) workerContext;
+  }
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    workerAggregatorUsage.aggregate(name, value);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return workerAggregatorUsage.<A>getAggregatedValue(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java b/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java
index 180c5d3..80baa4d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java
@@ -31,5 +31,5 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public abstract class BasicComputation<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    extends Computation<I, V, E, M, M> {
+    extends AbstractComputation<I, V, E, M, M> {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
index 34b0109..7a7b40f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
@@ -15,11 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.graph;
 
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.TypesHolder;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
@@ -28,15 +27,14 @@ import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Iterator;
 
 /**
- * Basic abstract class for writing a BSP application for computation.
+ * Interface for an application for computation.
  *
- * During the superstep there can be several instances of this class,
+ * During the superstep there can be several instances of this interface,
  * each doing computation on one partition of the graph's vertices.
  *
  * Note that each thread will have its own {@link Computation},
@@ -44,7 +42,7 @@ import java.util.Iterator;
  * However, accessing global data (like data from {@link WorkerContext})
  * is not thread-safe.
  *
- * Objects of this class only live for a single superstep.
+ * Objects of this interface only live for a single superstep.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
@@ -52,25 +50,12 @@ import java.util.Iterator;
  * @param <M1> Incoming message type
  * @param <M2> Outgoing message type
  */
-public abstract class Computation<I extends WritableComparable,
+public interface Computation<I extends WritableComparable,
     V extends Writable, E extends Writable, M1 extends Writable,
     M2 extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
-    implements TypesHolder<I, V, E, M1, M2>, WorkerAggregatorUsage {
-  /** Logger */
-  private static final Logger LOG = Logger.getLogger(Computation.class);
-
-  /** Global graph state **/
-  private GraphState graphState;
-  /** Handles requests */
-  private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
-  /** Graph-wide BSP Mapper for this Computation */
-  private GraphTaskManager<I, V, E> graphTaskManager;
-  /** Worker aggregator usage */
-  private WorkerAggregatorUsage workerAggregatorUsage;
-  /** Worker context */
-  private WorkerContext workerContext;
-
+    extends TypesHolder<I, V, E, M1, M2>,
+    ImmutableClassesGiraphConfigurable<I, V, E>,
+    WorkerAggregatorUsage {
   /**
    * Must be defined by user to do computation on a single Vertex.
    *
@@ -79,23 +64,21 @@ public abstract class Computation<I extends WritableComparable,
    *                 superstep.  Each message is only guaranteed to have
    *                 a life expectancy as long as next() is not called.
    */
-  public abstract void compute(Vertex<I, V, E> vertex,
-      Iterable<M1> messages) throws IOException;
+  void compute(Vertex<I, V, E> vertex, Iterable<M1> messages)
+    throws IOException;
 
   /**
    * Prepare for computation. This method is executed exactly once prior to
    * {@link #compute(Vertex, Iterable)} being called for any of the vertices
    * in the partition.
    */
-  public void preSuperstep() {
-  }
+  void preSuperstep();
 
   /**
    * Finish computation. This method is executed exactly once after computation
    * for all vertices in the partition is complete.
    */
-  public void postSuperstep() {
-  }
+  void postSuperstep();
 
   /**
    * Initialize, called by infrastructure before the superstep starts.
@@ -107,27 +90,17 @@ public abstract class Computation<I extends WritableComparable,
    * @param workerAggregatorUsage Worker aggregator usage
    * @param workerContext Worker context
    */
-  public void initialize(
-      GraphState graphState,
+  void initialize(GraphState graphState,
       WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
       GraphTaskManager<I, V, E> graphTaskManager,
-      WorkerAggregatorUsage workerAggregatorUsage,
-      WorkerContext workerContext) {
-    this.graphState = graphState;
-    this.workerClientRequestProcessor = workerClientRequestProcessor;
-    this.graphTaskManager = graphTaskManager;
-    this.workerAggregatorUsage = workerAggregatorUsage;
-    this.workerContext = workerContext;
-  }
+      WorkerAggregatorUsage workerAggregatorUsage, WorkerContext workerContext);
 
   /**
    * Retrieves the current superstep.
    *
    * @return Current superstep
    */
-  public long getSuperstep() {
-    return graphState.getSuperstep();
-  }
+  long getSuperstep();
 
   /**
    * Get the total (all workers) number of vertices that
@@ -135,9 +108,7 @@ public abstract class Computation<I extends WritableComparable,
    *
    * @return Total number of vertices (-1 if first superstep)
    */
-  public long getTotalNumVertices() {
-    return graphState.getTotalNumVertices();
-  }
+  long getTotalNumVertices();
 
   /**
    * Get the total (all workers) number of edges that
@@ -145,9 +116,7 @@ public abstract class Computation<I extends WritableComparable,
    *
    * @return Total number of edges (-1 if first superstep)
    */
-  public long getTotalNumEdges() {
-    return graphState.getTotalNumEdges();
-  }
+  long getTotalNumEdges();
 
   /**
    * Send a message to a vertex id.
@@ -155,9 +124,7 @@ public abstract class Computation<I extends WritableComparable,
    * @param id Vertex id to send the message to
    * @param message Message data to send
    */
-  public void sendMessage(I id, M2 message) {
-    workerClientRequestProcessor.sendMessageRequest(id, message);
-  }
+  void sendMessage(I id, M2 message);
 
   /**
    * Send a message to all edges.
@@ -165,9 +132,7 @@ public abstract class Computation<I extends WritableComparable,
    * @param vertex Vertex whose edges to send the message to.
    * @param message Message sent to all edges.
    */
-  public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
-    workerClientRequestProcessor.sendMessageToAllRequest(vertex, message);
-  }
+  void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message);
 
   /**
    * Send a message to multiple target vertex ids in the iterator.
@@ -175,11 +140,7 @@ public abstract class Computation<I extends WritableComparable,
    * @param vertexIdIterator An iterator to multiple target vertex ids.
    * @param message Message sent to all targets in the iterator.
    */
-  public void sendMessageToMultipleEdges(
-    Iterator<I> vertexIdIterator, M2 message) {
-    workerClientRequestProcessor.sendMessageToAllRequest(
-      vertexIdIterator, message);
-  }
+  void sendMessageToMultipleEdges(Iterator<I> vertexIdIterator, M2 message);
 
   /**
    * Sends a request to create a vertex that will be available during the
@@ -189,12 +150,7 @@ public abstract class Computation<I extends WritableComparable,
    * @param value Vertex value
    * @param edges Initial edges
    */
-  public void addVertexRequest(I id, V value,
-      OutEdges<I, E> edges) throws IOException {
-    Vertex<I, V, E> vertex = getConf().createVertex();
-    vertex.initialize(id, value, edges);
-    workerClientRequestProcessor.addVertexRequest(vertex);
-  }
+  void addVertexRequest(I id, V value, OutEdges<I, E> edges) throws IOException;
 
   /**
    * Sends a request to create a vertex that will be available during the
@@ -203,9 +159,7 @@ public abstract class Computation<I extends WritableComparable,
    * @param id Vertex id
    * @param value Vertex value
    */
-  public void addVertexRequest(I id, V value) throws IOException {
-    addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
-  }
+  void addVertexRequest(I id, V value) throws IOException;
 
   /**
    * Request to remove a vertex from the graph
@@ -213,9 +167,7 @@ public abstract class Computation<I extends WritableComparable,
    *
    * @param vertexId Id of the vertex to be removed.
    */
-  public void removeVertexRequest(I vertexId) throws IOException {
-    workerClientRequestProcessor.removeVertexRequest(vertexId);
-  }
+  void removeVertexRequest(I vertexId) throws IOException;
 
   /**
    * Request to add an edge of a vertex in the graph
@@ -224,10 +176,7 @@ public abstract class Computation<I extends WritableComparable,
    * @param sourceVertexId Source vertex id of edge
    * @param edge Edge to add
    */
-  public void addEdgeRequest(I sourceVertexId,
-      Edge<I, E> edge) throws IOException {
-    workerClientRequestProcessor.addEdgeRequest(sourceVertexId, edge);
-  }
+  void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) throws IOException;
 
   /**
    * Request to remove all edges from a given source vertex to a given target
@@ -236,20 +185,15 @@ public abstract class Computation<I extends WritableComparable,
    * @param sourceVertexId Source vertex id
    * @param targetVertexId Target vertex id
    */
-  public void removeEdgesRequest(I sourceVertexId,
-      I targetVertexId) throws IOException {
-    workerClientRequestProcessor.removeEdgesRequest(
-        sourceVertexId, targetVertexId);
-  }
+  void removeEdgesRequest(I sourceVertexId, I targetVertexId)
+    throws IOException;
 
   /**
    * Get the mapper context
    *
    * @return Mapper context
    */
-  public Mapper.Context getContext() {
-    return graphState.getContext();
-  }
+  Mapper.Context getContext();
 
   /**
    * Get the worker context
@@ -258,17 +202,5 @@ public abstract class Computation<I extends WritableComparable,
    * @return WorkerContext context
    */
   @SuppressWarnings("unchecked")
-  public <W extends WorkerContext> W getWorkerContext() {
-    return (W) workerContext;
-  }
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.<A>getAggregatedValue(name);
-  }
+  <W extends WorkerContext> W getWorkerContext();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
new file mode 100644
index 0000000..4a0ac8f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.ClassConfOption;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.ValueFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Enumeration for a user graph type (IVEMM)
+ */
+public enum GraphType {
+  /** Vertex ID */
+  VERTEX_ID {
+    @Override
+    public ClassConfOption<? extends Writable> writableConfOption() {
+      return GiraphConstants.VERTEX_ID_CLASS;
+    }
+    @Override
+    public ClassConfOption<? extends ValueFactory> factoryClassOption() {
+      return GiraphConstants.VERTEX_ID_FACTORY_CLASS;
+    }
+    @Override
+    public <T extends Writable> Class<T> get(
+        ImmutableClassesGiraphConfiguration conf) {
+      return conf.getVertexIdClass();
+    }
+    @Override
+    public <T extends Writable> ValueFactory<T> factory(
+        ImmutableClassesGiraphConfiguration conf) {
+      return conf.getVertexIdFactory();
+    }
+  },
+  /** Vertex Value */
+  VERTEX_VALUE {
+    @Override public ClassConfOption<? extends Writable> writableConfOption() {
+      return GiraphConstants.VERTEX_VALUE_CLASS;
+    }
+    @Override
+    public ClassConfOption<? extends ValueFactory> factoryClassOption() {
+      return GiraphConstants.VERTEX_VALUE_FACTORY_CLASS;
+    }
+    @Override
+    public <T extends Writable> Class<T> get(
+        ImmutableClassesGiraphConfiguration conf) {
+      return conf.getVertexValueClass();
+    }
+    @Override
+    public <T extends Writable> ValueFactory<T> factory(
+        ImmutableClassesGiraphConfiguration conf) {
+      return conf.getVertexValueFactory();
+    }
+  },
+  /** Edge Value */
+  EDGE_VALUE {
+    @Override
+    public ClassConfOption<? extends Writable> writableConfOption() {
+      return GiraphConstants.EDGE_VALUE_CLASS;
+    }
+    @Override
+    public ClassConfOption<? extends ValueFactory> factoryClassOption() {
+      return GiraphConstants.EDGE_VALUE_FACTORY_CLASS;
+    }
+    @Override
+    public <T extends Writable> Class<T> get(
+        ImmutableClassesGiraphConfiguration conf) {
+      return conf.getEdgeValueClass();
+    }
+    @Override
+    public <T extends Writable> ValueFactory<T> factory(
+        ImmutableClassesGiraphConfiguration conf) {
+      return conf.getEdgeValueFactory();
+    }
+  },
+  /** Incoming message value */
+  INCOMING_MESSAGE_VALUE {
+    @Override
+    public ClassConfOption<? extends Writable> writableConfOption() {
+      return GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
+    }
+    @Override
+    public ClassConfOption<? extends ValueFactory> factoryClassOption() {
+      return GiraphConstants.INCOMING_MESSAGE_VALUE_FACTORY_CLASS;
+    }
+    @Override
+    public <T extends Writable> Class<T> get(
+        ImmutableClassesGiraphConfiguration conf) {
+      return conf.getIncomingMessageValueClass();
+    }
+    @Override
+    public <T extends Writable> ValueFactory<T> factory(
+        ImmutableClassesGiraphConfiguration conf) {
+      return conf.getIncomingMessageValueFactory();
+    }
+  },
+  /** Outgoing message value */
+  OUTGOING_MESSAGE_VALUE {
+    @Override
+    public ClassConfOption<? extends Writable> writableConfOption() {
+      return GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
+    }
+    @Override
+    public ClassConfOption<? extends ValueFactory> factoryClassOption() {
+      return GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS;
+    }
+    @Override
+    public <T extends Writable> Class<T> get(
+        ImmutableClassesGiraphConfiguration conf) {
+      return conf.getOutgoingMessageValueClass();
+    }
+    @Override
+    public <T extends Writable> ValueFactory<T> factory(
+        ImmutableClassesGiraphConfiguration conf) {
+      return conf.getOutgoingMessageValueFactory();
+    }
+  };
+
+  @Override
+  public String toString() {
+    return name().toLowerCase();
+  }
+
+  /**
+   * Name of graph type with dots, like vertex.id or vertex.value
+   *
+   * @return name dot string
+   */
+  public String dotString() {
+    return toString().replaceAll("_", ".");
+  }
+
+  /**
+   * Name of graph type with spaces, like "vertex id" or "vertex value"
+   *
+   * @return name space string
+   */
+  public String spaceString() {
+    return toString().replaceAll("_", " ");
+  }
+
+  /**
+   * Get class set by user for this option
+   *
+   * @param <T> writable type
+   * @param conf Configuration
+   * @return Class for graph type
+   */
+  public abstract <T extends Writable> Class<T> get(
+      ImmutableClassesGiraphConfiguration conf);
+
+  /**
+   * Get factory for creating this graph type
+   *
+   * @param <T> writable type
+   * @param conf Configuration
+   * @return Factory for this graph type
+   */
+  public abstract <T extends Writable> ValueFactory<T> factory(
+      ImmutableClassesGiraphConfiguration conf);
+
+  /**
+   * Get the option for the factory that creates this graph type
+   *
+   * @param <T> writable type
+   * @return Factory class option
+   */
+  public abstract <T extends ValueFactory> ClassConfOption<T>
+  factoryClassOption();
+
+  /**
+   * Get the class option for this graph type
+   *
+   * @param <T> writable type
+   * @return ClassConfOption option
+   */
+  public abstract <T extends Writable> ClassConfOption<T> writableConfOption();
+
+  /**
+   * Get interface class (Writable or WritableComparable) for this graph type
+   *
+   * @param <T> writable type
+   * @return interface Writable class
+   */
+  public <T extends Writable> Class<T> interfaceClass() {
+    return this.<T>writableConfOption().getInterfaceClass();
+  }
+
+  /**
+   * Get class set by user for this option
+   *
+   * @param <T> writable type
+   * @param conf Configuration
+   * @return Class for graph type
+   */
+  public <T extends Writable> Class<? extends T> get(Configuration conf) {
+    if (conf instanceof ImmutableClassesGiraphConfiguration) {
+      ImmutableClassesGiraphConfiguration icgc =
+          (ImmutableClassesGiraphConfiguration) conf;
+      return get(icgc);
+    }
+    return this.<T>writableConfOption().get(conf);
+  }
+
+  /**
+   * Create a new instance of this value type from configuration
+   *
+   * @param <T> writable type
+   * @param conf {@link ImmutableClassesGiraphConfiguration}
+   * @return new instance of this graph value type
+   */
+  public <T extends Writable> T newInstance(
+      ImmutableClassesGiraphConfiguration conf) {
+    return (T) factory(conf).newInstance();
+  }
+}


Mime
View raw message