giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edu...@apache.org
Subject [3/3] git commit: updated refs/heads/trunk to 5d0b81a
Date Fri, 10 Apr 2015 19:20:29 GMT
[GIRAPH-1002] Improve message changing through iters

Summary:
Add MessageClasses object to hold all information about single
message class (factory, combiner, store type)

Made it so that it can be changed as a whole, allowing
any complete control over messages.

(i.e. message factory couldn't be used when messages where changing)

Incoming message type configs are completely useless. It needs to be the same
as outgoing message type in the previous step. So deleting that completely.

*breaking change* Removed initialize and getValueClass from value factories.
Initialize can be achieved, as with everywhere else, by implementing
GiraphConfigurationSettable. getValueClass was not used anywhere.

This way - value factory has only one function - and lambda can be passed to it.

Test Plan: mvn clean install -Phadoop_facebook

Reviewers: sergey.edunov, dionysis.logothetis, avery.ching, maja.kabiljo

Differential Revision: https://reviews.facebook.net/D36849


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5d0b81ac
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5d0b81ac
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5d0b81ac

Branch: refs/heads/trunk
Commit: 5d0b81ac42ab0de80bb68b87c7d1c1dd710f1108
Parents: 6ee97e7
Author: Igor Kabiljo <ikabiljo@fb.com>
Authored: Fri Apr 10 11:42:28 2015 -0700
Committer: Sergey Edunov <edunov@fb.com>
Committed: Fri Apr 10 12:18:55 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 findbugs-exclude.xml                            |   5 +
 .../apache/giraph/comm/SendMessageCache.java    |  12 +-
 .../giraph/comm/SendOneMessageToManyCache.java  |  10 +-
 .../java/org/apache/giraph/comm/ServerData.java |   4 +-
 .../ByteArrayMessagesPerVertexStore.java        |  10 +-
 .../messages/InMemoryMessageStoreFactory.java   |  47 ++--
 .../comm/messages/MessageStoreFactory.java      |  10 +-
 .../comm/messages/OneMessagePerVertexStore.java |  12 +-
 .../out_of_core/DiskBackedMessageStore.java     |  27 +-
 .../DiskBackedMessageStoreFactory.java          |   6 +-
 .../PartitionDiskBackedMessageStore.java        |  18 +-
 .../out_of_core/SequentialFileMessageStore.java |   6 +-
 .../primitives/IdOneMessagePerVertexStore.java  |   4 +-
 .../NettyWorkerClientRequestProcessor.java      |  27 +-
 .../SendPartitionCurrentMessagesRequest.java    |   5 +-
 .../requests/SendWorkerMessagesRequest.java     |   8 +-
 .../SendWorkerOneMessageToManyRequest.java      |   6 +-
 .../giraph/conf/DefaultMessageClasses.java      | 203 +++++++++++++++
 .../org/apache/giraph/conf/GiraphClasses.java   | 135 ++++------
 .../apache/giraph/conf/GiraphConfiguration.java |  37 +--
 .../org/apache/giraph/conf/GiraphConstants.java |  21 +-
 .../org/apache/giraph/conf/GiraphTypes.java     |  46 +---
 .../ImmutableClassesGiraphConfiguration.java    | 148 ++++++-----
 .../org/apache/giraph/conf/MessageClasses.java  |  85 ++++++
 .../apache/giraph/conf/PerGraphTypeBoolean.java |  19 --
 .../conf/PerGraphTypeBooleanConfOption.java     |  10 -
 .../apache/giraph/conf/PerGraphTypeEnum.java    |  19 --
 .../giraph/conf/PerGraphTypeEnumConfOption.java |  10 -
 .../factories/AbstractMessageValueFactory.java  |  67 -----
 .../factories/DefaultEdgeValueFactory.java      |  10 +-
 .../DefaultIncomingMessageValueFactory.java     |  34 ---
 .../factories/DefaultMessageValueFactory.java   |  58 +++++
 .../DefaultOutgoingMessageValueFactory.java     |  34 ---
 .../factories/DefaultVertexIdFactory.java       |  10 +-
 .../factories/DefaultVertexValueFactory.java    |  10 +-
 .../factories/TestMessageValueFactory.java      |   8 -
 .../apache/giraph/factories/ValueFactories.java |  38 +--
 .../apache/giraph/factories/ValueFactory.java   |  24 +-
 .../apache/giraph/graph/ComputeCallable.java    |  16 +-
 .../java/org/apache/giraph/graph/GraphType.java |  23 +-
 .../job/GiraphConfigurationValidator.java       |  26 +-
 .../org/apache/giraph/jython/JythonOptions.java |   4 -
 .../jython/factories/JythonFactoryBase.java     |  13 +-
 .../JythonIncomingMessageValueFactory.java      |  35 ---
 .../factories/JythonMessageValueFactory.java    |   5 -
 .../apache/giraph/master/BspServiceMaster.java  |  12 +-
 .../org/apache/giraph/master/MasterCompute.java |  16 ++
 .../apache/giraph/master/SuperstepClasses.java  | 257 ++++++++++---------
 .../java/org/apache/giraph/types/NoMessage.java |  42 +++
 .../giraph/utils/ByteArrayVertexIdMessages.java |  12 +-
 .../apache/giraph/utils/ReflectionUtils.java    |  46 ++++
 .../apache/giraph/worker/BspServiceWorker.java  |   9 +-
 .../giraph/worker/InputSplitsCallable.java      |  13 +-
 .../apache/giraph/comm/TestMessageStores.java   |  62 +++--
 .../apache/giraph/conf/TestObjectCreation.java  |  14 +-
 .../apache/giraph/io/TestVertexEdgeInput.java   |  22 +-
 .../master/TestComputationCombinerTypes.java    |  77 +++---
 .../apache/giraph/master/TestSwitchClasses.java |   3 -
 .../giraph/utils/TestReflectionUtils.java       |  20 +-
 .../java/org/apache/giraph/TestBspBasic.java    |   2 +-
 .../giraph/vertex/TestComputationTypes.java     |  29 +--
 .../giraph/hive/jython/HiveJythonUtils.java     |  73 +++---
 63 files changed, 1075 insertions(+), 1001 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 61cb3ce..30704e8 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  GIRAPH-1002: Improve message changing through iters (ikabiljo via edunov)
+
   GIRAPH-998: Close writers in parallel (majaakbiljo)
 
   GIRAPH-999: Add support for Mapping multi-input formats (dlogothetis via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index 9ac4412..0c2ab96 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -96,4 +96,9 @@
     <Class name="org.apache.giraph.partition.DiskBackedPartitionStore$GetPartition"/>
     <Bug pattern="UL_UNRELEASED_LOCK"/>
   </Match>
+  <Match>
+    <!-- Java Serialization is not used, so this is never an actual issue.
+      On the other hand, Kryo needs lambdas to be Serializable to work. -->
+    <Bug pattern="SE_BAD_FIELD,SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index b1fec01..e101b01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -18,6 +18,9 @@
 
 package org.apache.giraph.comm;
 
+import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
+
 import java.util.Iterator;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -28,17 +31,14 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
-import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
-
 /**
  * Aggregates the messages to be sent to workers so they can be sent
  * in bulk.  Not thread-safe.
@@ -81,7 +81,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
   @Override
   public VertexIdMessages<I, M> createVertexIdData() {
     return new ByteArrayVertexIdMessages<I, M>(
-        getConf().getOutgoingMessageValueFactory());
+        getConf().<M>createOutgoingMessageValueFactory());
   }
 
   /**
@@ -177,7 +177,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
    */
   private class TargetVertexIdIterator implements Iterator<I> {
     /** An edge iterator */
-    private Iterator<Edge<I, Writable>> edgesIterator;
+    private final Iterator<Edge<I, Writable>> edgesIterator;
 
     /**
      * Constructor.

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
index c67a20b..c00e560 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
@@ -22,24 +22,24 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
-import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
 import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
+import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
-import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import javax.annotation.concurrent.NotThreadSafe;
-
 /**
  * Aggregates the messages to be sent to workers so they can be sent
  * in bulk.
@@ -145,7 +145,7 @@ public class SendOneMessageToManyCache<I extends WritableComparable,
       msgVidsCache[workerInfo.getTaskId()];
     if (workerData == null) {
       workerData = new ByteArrayOneMessageToManyIds<I, M>(
-        getConf().getOutgoingMessageValueFactory());
+          getConf().<M>createOutgoingMessageValueFactory());
       workerData.setConf(getConf());
       workerData.initialize(getSendWorkerInitialBufferSize(
         workerInfo.getTaskId()));

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 1fd85e4..129df59 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -191,9 +191,9 @@ public class ServerData<I extends WritableComparable,
     }
     currentMessageStore =
         incomingMessageStore != null ? incomingMessageStore :
-            messageStoreFactory.newStore(conf.getIncomingMessageValueFactory());
+            messageStoreFactory.newStore(conf.getIncomingMessageClasses());
     incomingMessageStore =
-        messageStoreFactory.newStore(conf.getOutgoingMessageValueFactory());
+        messageStoreFactory.newStore(conf.getOutgoingMessageClasses());
     // finalize current message-store before resolving mutations
     currentMessageStore.finalizeStore();
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 57d255f..dfb5683 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -25,13 +25,14 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.RepresentativeByteStructIterator;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
 import org.apache.giraph.utils.VertexIdIterator;
 import org.apache.giraph.utils.VertexIdMessageBytesIterator;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.RepresentativeByteStructIterator;
-import org.apache.giraph.utils.VerboseByteStructMessageWrite;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -209,8 +210,9 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
 
     @Override
     public MessageStore<I, M> newStore(
-        MessageValueFactory<M> messageValueFactory) {
-      return new ByteArrayMessagesPerVertexStore<I, M>(messageValueFactory,
+        MessageClasses<I, M> messageClasses) {
+      return new ByteArrayMessagesPerVertexStore<I, M>(
+          messageClasses.createMessageValueFactory(config),
           service, config);
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index ae86c56..27980a9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -30,6 +30,7 @@ import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessage
 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
 import org.apache.giraph.types.ops.TypeOpsUtils;
@@ -72,37 +73,38 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
   /**
    * MessageStore to be used when combiner is enabled
    *
+   * @param messageClass message class
    * @param messageValueFactory message value factory
+   * @param messageCombiner message combiner
    * @return message store
    */
   protected MessageStore<I, M> newStoreWithCombiner(
-    MessageValueFactory<M> messageValueFactory) {
-    Class<M> messageClass = messageValueFactory.getValueClass();
+      Class<M> messageClass,
+      MessageValueFactory<M> messageValueFactory,
+      MessageCombiner<? super I, M> messageCombiner) {
     MessageStore messageStore;
     Class<I> vertexIdClass = conf.getVertexIdClass();
     if (vertexIdClass.equals(IntWritable.class) &&
         messageClass.equals(FloatWritable.class)) {
       messageStore = new IntFloatMessageStore(
           (CentralizedServiceWorker<IntWritable, Writable, Writable>) service,
-          (MessageCombiner<IntWritable, FloatWritable>)
-              conf.<FloatWritable>createMessageCombiner());
+          (MessageCombiner<IntWritable, FloatWritable>) messageCombiner);
     } else if (vertexIdClass.equals(LongWritable.class) &&
         messageClass.equals(DoubleWritable.class)) {
       messageStore = new LongDoubleMessageStore(
           (CentralizedServiceWorker<LongWritable, Writable, Writable>) service,
-          (MessageCombiner<LongWritable, DoubleWritable>)
-              conf.<DoubleWritable>createMessageCombiner());
+          (MessageCombiner<LongWritable, DoubleWritable>) messageCombiner);
     } else {
       PrimitiveIdTypeOps<I> idTypeOps =
           TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
       if (idTypeOps != null) {
         messageStore = new IdOneMessagePerVertexStore<>(
-            messageValueFactory, service, conf.<M>createMessageCombiner(),
+            messageValueFactory, service, messageCombiner,
             conf);
       } else {
         messageStore =
             new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
-                conf.<M>createMessageCombiner(), conf);
+                messageCombiner, conf);
       }
     }
     return messageStore;
@@ -111,14 +113,16 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
   /**
    * MessageStore to be used when combiner is not enabled
    *
+   * @param messageClass message class
    * @param messageValueFactory message value factory
+   * @param encodeAndStore message encode and store type
    * @return message store
    */
   protected MessageStore<I, M> newStoreWithoutCombiner(
-    MessageValueFactory<M> messageValueFactory) {
+      Class<M> messageClass,
+      MessageValueFactory<M> messageValueFactory,
+      MessageEncodeAndStoreType encodeAndStore) {
     MessageStore messageStore = null;
-    MessageEncodeAndStoreType encodeAndStore = GiraphConstants
-        .MESSAGE_ENCODE_AND_STORE_TYPE.get(conf);
     Class<I> vertexIdClass = conf.getVertexIdClass();
     if (vertexIdClass.equals(IntWritable.class)) { // INT
       messageStore = new IntByteArrayMessageStore(messageValueFactory,
@@ -160,21 +164,28 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
 
   @Override
   public MessageStore<I, M> newStore(
-      MessageValueFactory<M> messageValueFactory) {
-    Class<M> messageClass = messageValueFactory.getValueClass();
+      MessageClasses<I, M> messageClasses) {
+    Class<M> messageClass = messageClasses.getMessageClass();
+    MessageValueFactory<M> messageValueFactory =
+        messageClasses.createMessageValueFactory(conf);
+    MessageCombiner<? super I, M> messageCombiner =
+        messageClasses.createMessageCombiner(conf);
     MessageStore messageStore;
-    if (conf.useMessageCombiner()) {
-      messageStore = newStoreWithCombiner(messageValueFactory);
+    if (messageCombiner != null) {
+      messageStore = newStoreWithCombiner(
+          messageClass, messageValueFactory, messageCombiner);
     } else {
-      messageStore = newStoreWithoutCombiner(messageValueFactory);
+      messageStore = newStoreWithoutCombiner(
+          messageClass, messageValueFactory,
+          messageClasses.getMessageEncodeAndStoreType());
     }
 
     if (LOG.isInfoEnabled()) {
       LOG.info("newStore: Created " + messageStore.getClass() +
           " for vertex id " + conf.getVertexIdClass() +
           " and message value " + messageClass + " and" +
-          (conf.useMessageCombiner() ? " message combiner " +
-              conf.getMessageCombinerClass() : " no combiner"));
+          (messageCombiner != null ? " message combiner " +
+              messageCombiner.getClass() : " no combiner"));
     }
 
     int asyncMessageStoreThreads =

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index 6149a9c..41076e3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -20,7 +20,7 @@ package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -36,14 +36,10 @@ public interface MessageStoreFactory<I extends WritableComparable,
   /**
    * Creates new message store.
    *
-   * Note: MessageCombiner class in Configuration can be changed,
-   * this method should return MessageStore which uses current combiner
-   *
-   *
-   * @param messageValueFactory Message class held in the store
+   * @param messageClasses Message classes information to be held in the store
    * @return New message store
    */
-  MS newStore(MessageValueFactory<M> messageValueFactory);
+  MS newStore(MessageClasses<I, M> messageClasses);
 
   /**
    * Implementation class should use this method of initialization

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 349e58b..ad0a5dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
@@ -44,7 +45,7 @@ import org.apache.hadoop.io.WritableComparable;
 public class OneMessagePerVertexStore<I extends WritableComparable,
     M extends Writable> extends SimpleMessageStore<I, M, M> {
   /** MessageCombiner for messages */
-  private final MessageCombiner<I, M> messageCombiner;
+  private final MessageCombiner<? super I, M> messageCombiner;
 
   /**
    * @param messageValueFactory Message class held in the store
@@ -55,7 +56,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
   public OneMessagePerVertexStore(
       MessageValueFactory<M> messageValueFactory,
       CentralizedServiceWorker<I, ?, ?> service,
-      MessageCombiner<I, M> messageCombiner,
+      MessageCombiner<? super I, M> messageCombiner,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     super(messageValueFactory, service, config);
     this.messageCombiner =
@@ -162,9 +163,10 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
 
     @Override
     public MessageStore<I, M> newStore(
-        MessageValueFactory<M> messageValueFactory) {
-      return new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
-          config.<M>createMessageCombiner(), config);
+        MessageClasses<I, M> messageClasses) {
+      return new OneMessagePerVertexStore<I, M>(
+          messageClasses.createMessageValueFactory(config), service,
+          messageClasses.createMessageCombiner(config), config);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index 3000cd4..3351051 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -18,22 +18,23 @@
 
 package org.apache.giraph.comm.messages.out_of_core;
 
-import com.google.common.collect.Maps;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.collect.Maps;
 
 /**
  * Message store which separates data by partitions,
@@ -48,7 +49,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> implements
     MessageStore<I, M> {
   /** Message value factory */
-  private final MessageValueFactory<M> messageValueFactory;
+  private final MessageClasses<I, M> messageClasses;
   /** Service worker */
   private final CentralizedServiceWorker<I, V, E> service;
   /** Number of messages to keep in memory */
@@ -63,19 +64,19 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   /**
    * Constructor
    *
-   * @param messageValueFactory         Factory for creating message values
+   * @param messageClasses              Message classes information
    * @param service                     Service worker
    * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
    * @param partitionStoreFactory       Factory for creating stores for a
    *                                    partition
    */
   public DiskBackedMessageStore(
-      MessageValueFactory<M> messageValueFactory,
+      MessageClasses<I, M> messageClasses,
       CentralizedServiceWorker<I, V, E> service,
       int maxNumberOfMessagesInMemory,
       MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I,
           M>> partitionStoreFactory) {
-    this.messageValueFactory = messageValueFactory;
+    this.messageClasses = messageClasses;
     this.service = service;
     this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
     this.partitionStoreFactory = partitionStoreFactory;
@@ -238,7 +239,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
     if (messageStore != null) {
       return messageStore;
     }
-    messageStore = partitionStoreFactory.newStore(messageValueFactory);
+    messageStore = partitionStoreFactory.newStore(messageClasses);
     PartitionDiskBackedMessageStore<I, M> store =
         partitionMessageStores.putIfAbsent(partitionId, messageStore);
     return (store == null) ? messageStore : store;
@@ -260,7 +261,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
       int partitionId) throws IOException {
     if (in.readBoolean()) {
       PartitionDiskBackedMessageStore<I, M> messageStore =
-          partitionStoreFactory.newStore(messageValueFactory);
+          partitionStoreFactory.newStore(messageClasses);
       messageStore.readFields(in);
       partitionMessageStores.put(partitionId, messageStore);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
index f2b31c0..728a2ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
@@ -24,7 +24,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -70,8 +70,8 @@ public class DiskBackedMessageStoreFactory<I extends WritableComparable,
 
   @Override
   public MessageStore<I, M>
-  newStore(MessageValueFactory<M> messageValueFactory) {
-    return new DiskBackedMessageStore<I, V, E, M>(messageValueFactory,
+  newStore(MessageClasses<I, M> messageClasses) {
+    return new DiskBackedMessageStore<I, V, E, M>(messageClasses,
         service, maxMessagesInMemory, fileStoreFactory);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
index bece774..698281f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
@@ -35,6 +35,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
@@ -53,6 +54,8 @@ import com.google.common.collect.Maps;
  */
 public class PartitionDiskBackedMessageStore<I extends WritableComparable,
     M extends Writable> implements Writable {
+  /** Message classes */
+  private final MessageClasses<I, M> messageClasses;
   /** Message value factory */
   private final MessageValueFactory<M> messageValueFactory;
   /**
@@ -78,17 +81,18 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
   /**
    * Constructor.
    *
-   * @param messageValueFactory Used to create message values
+   * @param messageClasses      Message classes information
    * @param config              Hadoop configuration
    * @param fileStoreFactory    Factory for creating file stores when flushing
    */
   public PartitionDiskBackedMessageStore(
-      MessageValueFactory<M> messageValueFactory,
+      MessageClasses<I, M> messageClasses,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config,
       MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
           fileStoreFactory) {
     inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>();
-    this.messageValueFactory = messageValueFactory;
+    this.messageClasses = messageClasses;
+    this.messageValueFactory = messageClasses.createMessageValueFactory(config);
     this.config = config;
     numberOfMessagesInMemory = new AtomicInteger(0);
     destinationVertices =
@@ -227,7 +231,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
       rwLock.writeLock().unlock();
     }
     SequentialFileMessageStore<I, M> fileStore =
-        fileStoreFactory.newStore(messageValueFactory);
+        fileStoreFactory.newStore(messageClasses);
     fileStore.addMessages(messagesToFlush);
 
     synchronized (fileStores) {
@@ -287,7 +291,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
     int numFileStores = in.readInt();
     for (int s = 0; s < numFileStores; s++) {
       SequentialFileMessageStore<I, M> fileStore =
-          fileStoreFactory.newStore(messageValueFactory);
+          fileStoreFactory.newStore(messageClasses);
       fileStore.readFields(in);
       fileStores.add(fileStore);
     }
@@ -341,8 +345,8 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
 
     @Override
     public PartitionDiskBackedMessageStore<I, M> newStore(
-        MessageValueFactory<M> messageValueFactory) {
-      return new PartitionDiskBackedMessageStore<I, M>(messageValueFactory,
+        MessageClasses<I, M> messageClasses) {
+      return new PartitionDiskBackedMessageStore<I, M>(messageClasses,
           config, fileStoreFactory);
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
index 5988459..8f589bc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
@@ -42,6 +42,7 @@ import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.giraph.utils.io.DataInputOutput;
@@ -407,11 +408,12 @@ public class SequentialFileMessageStore<I extends WritableComparable,
 
     @Override
     public SequentialFileMessageStore<I, M> newStore(
-        MessageValueFactory<M> messageValueFactory) {
+        MessageClasses<I, M> messageClasses) {
       int idx = Math.abs(storeCounter.getAndIncrement());
       String fileName =
           directories[idx % directories.length] + "messages-" + idx;
-      return new SequentialFileMessageStore<I, M>(messageValueFactory, config,
+      return new SequentialFileMessageStore<I, M>(
+          messageClasses.createMessageValueFactory(config), config,
           bufferSize, fileName);
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
index c72bedf..1d2407c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
@@ -62,7 +62,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
   /** Message value factory */
   private final MessageValueFactory<M> messageValueFactory;
   /** Message messageCombiner */
-  private final MessageCombiner<I, M> messageCombiner;
+  private final MessageCombiner<? super I, M> messageCombiner;
   /** Service worker */
   private final CentralizedServiceWorker<I, ?, ?> service;
   /** Giraph configuration */
@@ -95,7 +95,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
   public IdOneMessagePerVertexStore(
       MessageValueFactory<M> messageValueFactory,
       CentralizedServiceWorker<I, ?, ?> service,
-      MessageCombiner<I, M> messageCombiner,
+      MessageCombiner<? super I, M> messageCombiner,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     this.service = service;
     this.config = config;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index f762f46..a64a33e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -17,15 +17,16 @@
  */
 package org.apache.giraph.comm.netty;
 
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.util.PercentGauge;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.SendOneMessageToManyCache;
 import org.apache.giraph.comm.SendEdgeCache;
 import org.apache.giraph.comm.SendMessageCache;
 import org.apache.giraph.comm.SendMutationsCache;
+import org.apache.giraph.comm.SendOneMessageToManyCache;
 import org.apache.giraph.comm.SendPartitionCache;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
@@ -58,9 +59,9 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.util.PercentGauge;
 
 /**
  * Aggregate requests and sends them to the thread-safe NettyClient.  This
@@ -89,8 +90,6 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
       new SendMutationsCache<I, V, E>();
   /** NettyClient that could be shared among one or more instances */
   private final WorkerClient<I, V, E> workerClient;
-  /** Messages sent during the last superstep */
-  private long totalMsgsSentInSuperstep = 0;
   /** Maximum size of messages per remote worker to cache before sending */
   private final int maxMessagesSizePerWorker;
   /** Maximum size of vertices per remote worker to cache before sending. */
@@ -118,11 +117,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
    * @param context Context
    * @param conf Configuration
    * @param serviceWorker Service worker
+   * @param useOneMessageToManyIdsEncoding should use one message to many
    */
   public NettyWorkerClientRequestProcessor(
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
-      CentralizedServiceWorker<I, V, E> serviceWorker) {
+      CentralizedServiceWorker<I, V, E> serviceWorker,
+      boolean useOneMessageToManyIdsEncoding) {
     this.workerClient = serviceWorker.getWorkerClient();
     this.configuration = conf;
 
@@ -134,7 +135,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
     maxVerticesSizePerWorker =
         GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
-    if (this.configuration.useOneMessageToManyIdsEncoding()) {
+    if (useOneMessageToManyIdsEncoding) {
       sendMessageCache =
         new SendOneMessageToManyCache<I, Writable>(conf, serviceWorker,
           this, maxMessagesSizePerWorker);
@@ -205,7 +206,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         serverData.getCurrentMessageStore();
     ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
         new ByteArrayVertexIdMessages<I, Writable>(
-            configuration.getOutgoingMessageValueFactory());
+            configuration.createOutgoingMessageValueFactory());
     vertexIdMessages.setConf(configuration);
     vertexIdMessages.initialize();
     for (I vertexId :
@@ -228,7 +229,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         doRequest(workerInfo, messagesRequest);
         vertexIdMessages =
             new ByteArrayVertexIdMessages<I, Writable>(
-                configuration.getOutgoingMessageValueFactory());
+                configuration.createOutgoingMessageValueFactory());
         vertexIdMessages.setConf(configuration);
         vertexIdMessages.initialize();
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
index c74b4f5..b59d0cf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
@@ -21,6 +21,7 @@ package org.apache.giraph.comm.requests;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.hadoop.io.Writable;
@@ -69,8 +70,8 @@ public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
     partitionId = input.readInt();
     // At this moment the Computation class have already been replaced with
     // the new one, and we deal with messages from previous superstep
-    vertexIdMessageMap = new ByteArrayVertexIdMessages<I, M>(
-        getConf().<M>getIncomingMessageValueFactory());
+    vertexIdMessageMap = new ByteArrayVertexIdMessages<>(
+        getConf().<M>createIncomingMessageValueFactory());
     vertexIdMessageMap.setConf(getConf());
     vertexIdMessageMap.initialize();
     vertexIdMessageMap.readFields(input);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index d525164..6953998 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -18,15 +18,15 @@
 
 package org.apache.giraph.comm.requests;
 
+import java.io.IOException;
+
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.IOException;
-
 /**
  * Send a collection of vertex messages for a partition.
  *
@@ -56,7 +56,7 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
   @Override
   public VertexIdMessages<I, M> createVertexIdData() {
     return new ByteArrayVertexIdMessages<I, M>(
-        getConf().getOutgoingMessageValueFactory());
+        getConf().createOutgoingMessageValueFactory());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
index 798ddfa..f8d0473 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
@@ -74,8 +74,8 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
 
   @Override
   public void readFieldsRequest(DataInput input) throws IOException {
-    oneMessageToManyIds = new ByteArrayOneMessageToManyIds<I, M>(
-      getConf().<M>getOutgoingMessageValueFactory());
+    oneMessageToManyIds = new ByteArrayOneMessageToManyIds<>(
+        getConf().<M>createOutgoingMessageValueFactory());
     oneMessageToManyIds.setConf(getConf());
     oneMessageToManyIds.readFields(input);
   }
@@ -132,7 +132,7 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
               .get(partitionId);
           if (idMsgs == null) {
             idMsgs = new ByteArrayVertexIdMessages<>(
-                getConf().<M>getOutgoingMessageValueFactory());
+                getConf().<M>createOutgoingMessageValueFactory());
             idMsgs.setConf(getConf());
             idMsgs.initialize(initialSize);
             partitionIdMsgs.put(partitionId, idMsgs);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/DefaultMessageClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/DefaultMessageClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/DefaultMessageClasses.java
new file mode 100644
index 0000000..b6e5169
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/DefaultMessageClasses.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.python.google.common.base.Preconditions;
+
+/**
+ * Default implementation of MessageClasses
+ *
+ * @param <I> Vertex id type
+ * @param <M> Message type
+ */
+public class DefaultMessageClasses
+    <I extends WritableComparable, M extends Writable>
+    implements MessageClasses<I, M> {
+  /** message class */
+  private Class<M> messageClass;
+  /** message value factory class */
+  private Class<? extends MessageValueFactory<M>>
+  messageValueFactoryClass;
+  /** message combiner class */
+  private Class<? extends MessageCombiner<? super I, M>> messageCombinerClass;
+  /** whether message class was manually modified in this superstep */
+  private boolean messageClassModified;
+  /** message encode and store type */
+  private MessageEncodeAndStoreType messageEncodeAndStoreType;
+
+  /** Constructor */
+  public DefaultMessageClasses() {
+  }
+
+  /**
+   * Constructor
+   * @param messageClass message class
+   * @param messageValueFactoryClass message value factory class
+   * @param messageCombinerClass message combiner class
+   * @param messageEncodeAndStoreType message encode and store type
+   */
+  public DefaultMessageClasses(
+      Class<M> messageClass,
+      Class<? extends MessageValueFactory<M>> messageValueFactoryClass,
+      Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
+        MessageEncodeAndStoreType messageEncodeAndStoreType) {
+    this.messageClass = messageClass;
+    this.messageValueFactoryClass = messageValueFactoryClass;
+    this.messageCombinerClass = messageCombinerClass;
+    this.messageClassModified = false;
+    this.messageEncodeAndStoreType = messageEncodeAndStoreType;
+  }
+
+  @Override
+  public Class<M> getMessageClass() {
+    return messageClass;
+  }
+
+  @Override
+  public MessageValueFactory<M> createMessageValueFactory(
+      ImmutableClassesGiraphConfiguration conf) {
+    if (messageValueFactoryClass.equals(DefaultMessageValueFactory.class)) {
+      return new DefaultMessageValueFactory<>(messageClass, conf);
+    }
+
+    MessageValueFactory factory = ReflectionUtils.newInstance(
+        messageValueFactoryClass, conf);
+    if (!factory.newInstance().getClass().equals(messageClass)) {
+      throw new IllegalStateException("Message factory " +
+        messageValueFactoryClass + " creates " +
+        factory.newInstance().getClass().getName() + ", but message type is " +
+        messageClass.getName());
+    }
+    return factory;
+  }
+
+  @Override
+  public MessageCombiner<? super I, M> createMessageCombiner(
+      ImmutableClassesGiraphConfiguration conf) {
+    if (messageCombinerClass == null) {
+      return null;
+    }
+
+    MessageCombiner combiner =
+        ReflectionUtils.newInstance(messageCombinerClass, conf);
+    if (combiner != null) {
+      Preconditions.checkState(
+          combiner.createInitialMessage().getClass().equals(messageClass));
+    }
+    return combiner;
+  }
+
+  @Override
+  public boolean useMessageCombiner() {
+    return messageCombinerClass != null;
+  }
+
+  @Override
+  public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
+    return messageEncodeAndStoreType;
+  }
+
+  @Override
+  public MessageClasses<I, M> createCopyForNewSuperstep() {
+    return new DefaultMessageClasses<>(messageClass, messageValueFactoryClass,
+        messageCombinerClass, messageEncodeAndStoreType);
+  }
+
+  @Override
+  public void verifyConsistent(
+      ImmutableClassesGiraphConfiguration conf) {
+    Class<?>[] factoryTypes = ReflectionUtils.getTypeArguments(
+        MessageValueFactory.class, messageValueFactoryClass);
+    ReflectionUtils.verifyTypes(messageClass, factoryTypes[0],
+        "Message factory", messageValueFactoryClass);
+
+    if (messageCombinerClass != null) {
+      Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
+          MessageCombiner.class, messageCombinerClass);
+      ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
+          "Vertex id", messageCombinerClass);
+      ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
+          "Outgoing message", messageCombinerClass);
+    }
+  }
+
+  /**
+   * Set message class
+   * @param messageClass message classs
+   */
+  public void setMessageClass(Class<M> messageClass) {
+    this.messageClassModified = true;
+    this.messageClass = messageClass;
+  }
+
+  /**
+   * Set message class if not set already in this superstep
+   * @param messageClass message class
+   */
+  public void setIfNotModifiedMessageClass(Class<M> messageClass) {
+    if (!messageClassModified) {
+      this.messageClass = messageClass;
+    }
+  }
+
+  public void setMessageCombinerClass(
+      Class<? extends MessageCombiner<? super I, M>> messageCombinerClass) {
+    this.messageCombinerClass = messageCombinerClass;
+  }
+
+  public void setMessageValueFactoryClass(
+      Class<? extends MessageValueFactory<M>> messageValueFactoryClass) {
+    this.messageValueFactoryClass = messageValueFactoryClass;
+  }
+
+  public void setMessageEncodeAndStoreType(
+      MessageEncodeAndStoreType messageEncodeAndStoreType) {
+    this.messageEncodeAndStoreType = messageEncodeAndStoreType;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeClass(messageClass, out);
+    WritableUtils.writeClass(messageValueFactoryClass, out);
+    WritableUtils.writeClass(messageCombinerClass, out);
+    out.writeBoolean(messageClassModified);
+    out.writeByte(messageEncodeAndStoreType.ordinal());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    messageClass = WritableUtils.readClass(in);
+    messageValueFactoryClass = WritableUtils.readClass(in);
+    messageCombinerClass = WritableUtils.readClass(in);
+    messageClassModified = in.readBoolean();
+    messageEncodeAndStoreType =
+        messageEncodeAndStoreType.values()[in.readByte()];
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 2f3c43a..88d5277 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -19,17 +19,18 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.ComputationFactory;
 import org.apache.giraph.factories.DefaultComputationFactory;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.DefaultVertexResolver;
-import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.DefaultVertexValueCombiner;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueCombiner;
-import org.apache.giraph.graph.DefaultVertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.MappingInputFormat;
@@ -45,6 +46,7 @@ import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.SimplePartition;
+import org.apache.giraph.types.NoMessage;
 import org.apache.giraph.worker.DefaultWorkerContext;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
@@ -99,9 +101,11 @@ public class GiraphClasses<I extends WritableComparable,
 
   /** Aggregator writer class - cached for fast access */
   protected Class<? extends AggregatorWriter> aggregatorWriterClass;
-  /** Message combiner class - cached for fast access */
-  protected Class<? extends MessageCombiner<I, ? extends Writable>>
-  messageCombinerClass;
+
+  /** Incoming message classes */
+  protected MessageClasses<I, ? extends Writable> incomingMessageClasses;
+  /** Outgoing message classes */
+  protected MessageClasses<I, ? extends Writable> outgoingMessageClasses;
 
   /** Vertex resolver class - cached for fast access */
   protected Class<? extends VertexResolver<I, V, E>> vertexResolverClass;
@@ -124,32 +128,19 @@ public class GiraphClasses<I extends WritableComparable,
    * Empty constructor. Initialize with default classes or null.
    */
   public GiraphClasses() {
-    // Note: the cast to Object is required in order for javac to accept the
-    // downcast.
-    computationFactoryClass = (Class<? extends ComputationFactory<I, V, E,
-          ? extends Writable, ? extends Writable>>) (Object)
-        DefaultComputationFactory.class;
+    computationFactoryClass = (Class) DefaultComputationFactory.class;
     giraphTypes = new GiraphTypes<I, V, E>();
-    outEdgesClass = (Class<? extends OutEdges<I, E>>) (Object)
-        ByteArrayEdges.class;
-    inputOutEdgesClass = (Class<? extends OutEdges<I, E>>) (Object)
-        ByteArrayEdges.class;
-    graphPartitionerFactoryClass =
-        (Class<? extends GraphPartitionerFactory<I, V, E>>) (Object)
-            HashPartitionerFactory.class;
+    outEdgesClass = (Class) ByteArrayEdges.class;
+    inputOutEdgesClass = (Class) ByteArrayEdges.class;
+    graphPartitionerFactoryClass = (Class) HashPartitionerFactory.class;
     aggregatorWriterClass = TextAggregatorWriter.class;
-    vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
-        (Object) DefaultVertexResolver.class;
-    vertexValueCombinerClass = (Class<? extends VertexValueCombiner<V>>)
-        (Object) DefaultVertexValueCombiner.class;
+    vertexResolverClass = (Class) DefaultVertexResolver.class;
+    vertexValueCombinerClass = (Class) DefaultVertexValueCombiner.class;
     workerContextClass = DefaultWorkerContext.class;
     masterComputeClass = DefaultMasterCompute.class;
-    partitionClass = (Class<? extends Partition<I, V, E>>) (Object)
-        SimplePartition.class;
-    edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>)
-        (Object) DefaultEdgeInputFilter.class;
-    vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E>>)
-        (Object) DefaultVertexInputFilter.class;
+    partitionClass = (Class) SimplePartition.class;
+    edgeInputFilterClass = (Class) DefaultEdgeInputFilter.class;
+    vertexInputFilterClass = (Class) DefaultVertexInputFilter.class;
   }
 
   /**
@@ -190,9 +181,21 @@ public class GiraphClasses<I extends WritableComparable,
         MAPPING_INPUT_FORMAT_CLASS.get(conf);
 
     aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
-    messageCombinerClass =
-        (Class<? extends MessageCombiner<I, ? extends Writable>>)
-        MESSAGE_COMBINER_CLASS.get(conf);
+
+    // incoming messages shouldn't be used in first iteration at all
+    // but empty message stores are created, etc, so using NoMessage
+    // to enforce not a single message is read/written
+    incomingMessageClasses = new DefaultMessageClasses(
+        NoMessage.class,
+        DefaultMessageValueFactory.class,
+        null,
+        MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION);
+    outgoingMessageClasses = new DefaultMessageClasses(
+        giraphTypes.getInitialOutgoingMessageValueClass(),
+        OUTGOING_MESSAGE_VALUE_FACTORY_CLASS.get(conf),
+        MESSAGE_COMBINER_CLASS.get(conf),
+        MESSAGE_ENCODE_AND_STORE_TYPE.get(conf));
+
     vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
         VERTEX_RESOLVER_CLASS.get(conf);
     vertexValueCombinerClass = (Class<? extends VertexValueCombiner<V>>)
@@ -266,24 +269,15 @@ public class GiraphClasses<I extends WritableComparable,
     return giraphTypes.getEdgeValueClass();
   }
 
-  /**
-   * Get incoming Message Value class - messages which have been sent in the
-   * previous superstep and are processed in the current one
-   *
-   * @return Message Value class
-   */
-  public Class<? extends Writable> getIncomingMessageValueClass() {
-    return giraphTypes.getIncomingMessageValueClass();
+
+  public MessageClasses<? extends WritableComparable, ? extends Writable>
+  getIncomingMessageClasses() {
+    return incomingMessageClasses;
   }
 
-  /**
-   * Get outgoing Message Value class - messages which are going to be sent
-   * during current superstep
-   *
-   * @return Message Value class
-   */
-  public Class<? extends Writable> getOutgoingMessageValueClass() {
-    return giraphTypes.getOutgoingMessageValueClass();
+  public MessageClasses<? extends WritableComparable, ? extends Writable>
+  getOutgoingMessageClasses() {
+    return outgoingMessageClasses;
   }
 
   /**
@@ -432,25 +426,6 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Check if MessageCombiner is set
-   *
-   * @return true if MessageCombiner is set
-   */
-  public boolean hasMessageCombinerClass() {
-    return messageCombinerClass != null;
-  }
-
-  /**
-   * Get MessageCombiner used
-   *
-   * @return MessageCombiner
-   */
-  public Class<? extends MessageCombiner<I, ? extends Writable>>
-  getMessageCombinerClass() {
-    return messageCombinerClass;
-  }
-
-  /**
    * Check if VertexResolver is set
    *
    * @return true if VertexResolver is set
@@ -581,12 +556,12 @@ public class GiraphClasses<I extends WritableComparable,
    * Set incoming Message Value class held - messages which have been sent in
    * the previous superstep and are processed in the current one
    *
-   * @param incomingMessageValueClass Message Value class to set
+   * @param incomingMessageClasses Message classes value to set
    * @return this
    */
-  public GiraphClasses setIncomingMessageValueClass(
-      Class<? extends Writable> incomingMessageValueClass) {
-    giraphTypes.setIncomingMessageValueClass(incomingMessageValueClass);
+  public GiraphClasses setIncomingMessageClasses(
+      MessageClasses<I, ? extends Writable> incomingMessageClasses) {
+    this.incomingMessageClasses = incomingMessageClasses;
     return this;
   }
 
@@ -594,12 +569,12 @@ public class GiraphClasses<I extends WritableComparable,
    * Set outgoing Message Value class held - messages which are going to be sent
    * during current superstep
    *
-   * @param outgoingMessageValueClass Message Value class to set
+   * @param outgoingMessageClasses Message classes value to set
    * @return this
    */
-  public GiraphClasses setOutgoingMessageValueClass(
-      Class<? extends Writable> outgoingMessageValueClass) {
-    giraphTypes.setOutgoingMessageValueClass(outgoingMessageValueClass);
+  public GiraphClasses setOutgoingMessageClasses(
+      MessageClasses<I, ? extends Writable> outgoingMessageClasses) {
+    this.outgoingMessageClasses = outgoingMessageClasses;
     return this;
   }
 
@@ -704,18 +679,6 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Set MessageCombiner class used
-   *
-   * @param combinerClass MessageCombiner class to set
-   * @return this
-   */
-  public GiraphClasses setMessageCombiner(
-      Class<? extends MessageCombiner<I, ? extends Writable>> combinerClass) {
-    this.messageCombinerClass = combinerClass;
-    return this;
-  }
-
-  /**
    * Set VertexResolver used
    *
    * @param vertexResolverClass VertexResolver to set

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index a315399..a395244 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -18,17 +18,23 @@
 
 package org.apache.giraph.conf;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.net.UnknownHostException;
+
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReuseObjectsOutEdges;
 import org.apache.giraph.factories.ComputationFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexValueCombiner;
-import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.MappingInputFormat;
@@ -50,12 +56,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.net.DNS;
 
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.UnpooledByteBufAllocator;
-
-import java.net.UnknownHostException;
-
 /**
  * Adds user methods specific to Giraph.  This will be put into an
  * ImmutableClassesGiraphConfiguration that provides the configuration plus
@@ -527,15 +527,6 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Get the message combiner class (optional)
-   *
-   * @return messageCombinerClass Determines how vertex messages are combined
-   */
-  public Class<? extends MessageCombiner> getMessageCombinerClass() {
-    return MESSAGE_COMBINER_CLASS.get(this);
-  }
-
-  /**
    * Set the message combiner class (optional)
    *
    * @param messageCombinerClass Determines how vertex messages are combined
@@ -1208,16 +1199,6 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Return if oneMessageToManyIds encoding can be enabled
-   *
-   * @return True if this option is true.
-   */
-  public boolean useOneMessageToManyIdsEncoding() {
-    return MESSAGE_ENCODE_AND_STORE_TYPE.get(this)
-      .useOneMessageToManyIdsEncoding();
-  }
-
-  /**
    * Get option whether to create a source vertex present only in edge input
    *
    * @return CREATE_EDGE_SOURCE_VERTICES option

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 65b7892..2805c26 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -17,6 +17,9 @@
  */
 package org.apache.giraph.conf;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
@@ -32,8 +35,7 @@ import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.ComputationFactory;
 import org.apache.giraph.factories.DefaultComputationFactory;
 import org.apache.giraph.factories.DefaultEdgeValueFactory;
-import org.apache.giraph.factories.DefaultIncomingMessageValueFactory;
-import org.apache.giraph.factories.DefaultOutgoingMessageValueFactory;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
 import org.apache.giraph.factories.DefaultVertexIdFactory;
 import org.apache.giraph.factories.DefaultVertexValueFactory;
 import org.apache.giraph.factories.EdgeValueFactory;
@@ -78,9 +80,6 @@ import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
 /**
  * Constants used all over Giraph for configuration.
  */
@@ -169,17 +168,11 @@ public interface GiraphConstants {
       ClassConfOption.create("giraph.edgeValueFactoryClass",
           DefaultEdgeValueFactory.class, EdgeValueFactory.class,
           "Edge value factory class - optional");
-  /** Incoming message value factory class - optional */
-  ClassConfOption<MessageValueFactory>
-  INCOMING_MESSAGE_VALUE_FACTORY_CLASS =
-      ClassConfOption.create("giraph.incomingMessageValueFactoryClass",
-          DefaultIncomingMessageValueFactory.class, MessageValueFactory.class,
-          "Incoming message value factory class - optional");
   /** Outgoing message value factory class - optional */
   ClassConfOption<MessageValueFactory>
   OUTGOING_MESSAGE_VALUE_FACTORY_CLASS =
       ClassConfOption.create("giraph.outgoingMessageValueFactoryClass",
-          DefaultOutgoingMessageValueFactory.class, MessageValueFactory.class,
+          DefaultMessageValueFactory.class, MessageValueFactory.class,
           "Outgoing message value factory class - optional");
 
   /** Vertex edges class - optional */
@@ -381,10 +374,6 @@ public interface GiraphConstants {
   ClassConfOption<Writable> EDGE_VALUE_CLASS =
       ClassConfOption.create("giraph.edgeValueClass", null, Writable.class,
           "Edge value class");
-  /** Incoming message value class */
-  ClassConfOption<Writable> INCOMING_MESSAGE_VALUE_CLASS =
-      ClassConfOption.create("giraph.incomingMessageValueClass", null,
-          Writable.class, "Incoming message value class");
   /** Outgoing message value class */
   ClassConfOption<Writable> OUTGOING_MESSAGE_VALUE_CLASS =
       ClassConfOption.create("giraph.outgoingMessageValueClass", null,

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
index 6c854f3..f98912d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
@@ -17,6 +17,14 @@
  */
 package org.apache.giraph.conf;
 
+import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
+import static org.apache.giraph.utils.ConfigurationUtils.getTypesHolderClass;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+
 import org.apache.giraph.graph.DefaultVertex;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
@@ -25,15 +33,6 @@ import org.apache.hadoop.io.WritableComparable;
 
 import com.google.common.base.Preconditions;
 
-import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_CLASS;
-import static org.apache.giraph.utils.ConfigurationUtils.getTypesHolderClass;
-import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
-
 /**
  * Holder for the generic types that describe user's graph.
  *
@@ -49,8 +48,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
   private Class<V> vertexValueClass;
   /** Edge value class */
   private Class<E> edgeValueClass;
-  /** Incoming message value class */
-  private Class<? extends Writable> incomingMessageValueClass;
   /** Outgoing message value class */
   private Class<? extends Writable> outgoingMessageValueClass;
   /** Vertex implementation class */
@@ -77,7 +74,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
       Class<? extends Writable> incomingMessageValueClass,
       Class<? extends Writable> outgoingMessageValueClass) {
     this.edgeValueClass = edgeValueClass;
-    this.incomingMessageValueClass = incomingMessageValueClass;
     this.outgoingMessageValueClass = outgoingMessageValueClass;
     this.vertexIdClass = vertexIdClass;
     this.vertexValueClass = vertexValueClass;
@@ -119,7 +115,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     vertexIdClass = (Class<I>) classList[0];
     vertexValueClass = (Class<V>) classList[1];
     edgeValueClass = (Class<E>) classList[2];
-    incomingMessageValueClass = (Class<? extends Writable>) classList[3];
     outgoingMessageValueClass = (Class<? extends Writable>) classList[4];
   }
 
@@ -132,7 +127,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     vertexIdClass = (Class<I>) VERTEX_ID_CLASS.get(conf);
     vertexValueClass = (Class<V>) VERTEX_VALUE_CLASS.get(conf);
     edgeValueClass = (Class<E>) EDGE_VALUE_CLASS.get(conf);
-    incomingMessageValueClass = INCOMING_MESSAGE_VALUE_CLASS.get(conf);
     outgoingMessageValueClass = OUTGOING_MESSAGE_VALUE_CLASS.get(conf);
     vertexClass = VERTEX_CLASS.get(conf);
   }
@@ -146,7 +140,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     return vertexIdClass != null &&
         vertexValueClass != null &&
         edgeValueClass != null &&
-        incomingMessageValueClass != null &&
         outgoingMessageValueClass != null;
   }
 
@@ -159,7 +152,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     VERTEX_ID_CLASS.set(conf, vertexIdClass);
     VERTEX_VALUE_CLASS.set(conf, vertexValueClass);
     EDGE_VALUE_CLASS.set(conf, edgeValueClass);
-    INCOMING_MESSAGE_VALUE_CLASS.set(conf, incomingMessageValueClass);
     OUTGOING_MESSAGE_VALUE_CLASS.set(conf, outgoingMessageValueClass);
   }
 
@@ -172,7 +164,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     VERTEX_ID_CLASS.setIfUnset(conf, vertexIdClass);
     VERTEX_VALUE_CLASS.setIfUnset(conf, vertexValueClass);
     EDGE_VALUE_CLASS.setIfUnset(conf, edgeValueClass);
-    INCOMING_MESSAGE_VALUE_CLASS.setIfUnset(conf, incomingMessageValueClass);
     OUTGOING_MESSAGE_VALUE_CLASS.setIfUnset(conf, outgoingMessageValueClass);
   }
 
@@ -180,11 +171,7 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     return edgeValueClass;
   }
 
-  public Class<? extends Writable> getIncomingMessageValueClass() {
-    return incomingMessageValueClass;
-  }
-
-  public Class<? extends Writable> getOutgoingMessageValueClass() {
+  Class<? extends Writable> getInitialOutgoingMessageValueClass() {
     return outgoingMessageValueClass;
   }
 
@@ -204,16 +191,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     this.edgeValueClass = edgeValueClass;
   }
 
-  public void setIncomingMessageValueClass(
-      Class<? extends Writable> incomingMessageValueClass) {
-    this.incomingMessageValueClass = incomingMessageValueClass;
-  }
-
-  public void setOutgoingMessageValueClass(
-      Class<? extends Writable> outgoingMessageValueClass) {
-    this.outgoingMessageValueClass = outgoingMessageValueClass;
-  }
-
   public void setVertexIdClass(Class<I> vertexIdClass) {
     this.vertexIdClass = vertexIdClass;
   }
@@ -221,4 +198,9 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
   public void setVertexValueClass(Class<V> vertexValueClass) {
     this.vertexValueClass = vertexValueClass;
   }
+
+  public void setOutgoingMessageValueClass(
+      Class<? extends Writable> outgoingMessageValueClass) {
+    this.outgoingMessageValueClass = outgoingMessageValueClass;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 381495e..967737c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -27,6 +27,7 @@ import io.netty.handler.codec.compression.SnappyFramedEncoder;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.edge.EdgeStoreFactory;
@@ -142,8 +143,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
         GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
     valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
         GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf);
-    valueFactories = new ValueFactories<I, V, E>(conf);
-    valueFactories.initializeIVE(this);
+    valueFactories = new ValueFactories<I, V, E>(this);
   }
 
   /**
@@ -530,40 +530,6 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
 
   /**
    * Get the user's subclassed
-   * {@link org.apache.giraph.combiner.MessageCombiner} class.
-   *
-   * @return User's combiner class
-   */
-  @Override
-  public Class<? extends MessageCombiner<I, ? extends Writable>>
-  getMessageCombinerClass() {
-    return classes.getMessageCombinerClass();
-  }
-
-  /**
-   * Create a user combiner class
-   *
-   * @param <M> Message data
-   * @return Instantiated user combiner class
-   */
-  @SuppressWarnings("rawtypes")
-  public <M extends Writable> MessageCombiner<I, M> createMessageCombiner() {
-    Class<? extends MessageCombiner<I, M>> klass =
-        classes.getMessageCombinerClass();
-    return ReflectionUtils.newInstance(klass, this);
-  }
-
-  /**
-   * Check if user set a combiner
-   *
-   * @return True iff user set a combiner class
-   */
-  public boolean useMessageCombiner() {
-    return classes.hasMessageCombinerClass();
-  }
-
-  /**
-   * Get the user's subclassed
    * {@link org.apache.giraph.graph.VertexValueCombiner} class.
    *
    * @return User's vertex value combiner class
@@ -905,47 +871,92 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    * @return User's vertex message value class
    */
   public <M extends Writable> Class<M> getIncomingMessageValueClass() {
-    return classes.getIncomingMessageValueClass();
+    return classes.getIncomingMessageClasses().getMessageClass();
   }
 
   /**
-   * Get the factory for creating incoming message values
+   * Get the user's subclassed outgoing message value class.
    *
-   * @param <M> Incoming Message type
-   * @return MessageValueFactory
+   * @param <M> Message type
+   * @return User's vertex message value class
    */
-  public <M extends Writable> MessageValueFactory<M>
-  getIncomingMessageValueFactory() {
-    Class<? extends MessageValueFactory> klass =
-        valueFactories.getInMsgFactoryClass();
-    MessageValueFactory<M> factory = ReflectionUtils.newInstance(klass, this);
-    factory.initialize(this);
-    return factory;
+  public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
+    return classes.getOutgoingMessageClasses().getMessageClass();
   }
 
   /**
-   * Get the user's subclassed outgoing message value class.
+   * Get incoming message classes
+   * @param <M> message type
+   * @return incoming message classes
+   */
+  public <M extends Writable>
+  MessageClasses<I, M> getIncomingMessageClasses() {
+    return classes.getIncomingMessageClasses();
+  }
+
+  /**
+   * Get outgoing message classes
+   * @param <M> message type
+   * @return outgoing message classes
+   */
+  public <M extends Writable>
+  MessageClasses<I, M> getOutgoingMessageClasses() {
+    return classes.getOutgoingMessageClasses();
+  }
+
+  /**
+   * Create new outgoing message value factory
+   * @param <M> message type
+   * @return outgoing message value factory
+   */
+  public <M extends Writable>
+  MessageValueFactory<M> createOutgoingMessageValueFactory() {
+    return classes.getOutgoingMessageClasses().createMessageValueFactory(this);
+  }
+
+  /**
+   * Create new incoming message value factory
+   * @param <M> message type
+   * @return incoming message value factory
+   */
+  public <M extends Writable>
+  MessageValueFactory<M> createIncomingMessageValueFactory() {
+    return classes.getIncomingMessageClasses().createMessageValueFactory(this);
+  }
+
+  @Override
+  public void setMessageCombinerClass(
+      Class<? extends MessageCombiner> messageCombinerClass) {
+    throw new IllegalArgumentException(
+        "Cannot set message combiner on ImmutableClassesGiraphConfigurable");
+  }
+
+  /**
+   * Create a user combiner class
    *
    * @param <M> Message data
-   * @return User's vertex message value class
+   * @return Instantiated user combiner class
    */
-  public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
-    return classes.getOutgoingMessageValueClass();
+  public <M extends Writable> MessageCombiner<? super I, M>
+  createOutgoingMessageCombiner() {
+    return classes.getOutgoingMessageClasses().createMessageCombiner(this);
   }
 
   /**
-   * Get the factory for creating outgoing message values
+   * Check if user set a combiner
    *
-   * @param <M> Outgoing Message type
-   * @return MessageValueFactory
+   * @return True iff user set a combiner class
    */
-  public <M extends Writable> MessageValueFactory<M>
-  getOutgoingMessageValueFactory() {
-    Class<? extends MessageValueFactory> klass =
-        valueFactories.getOutMsgFactoryClass();
-    MessageValueFactory<M> factory = ReflectionUtils.newInstance(klass, this);
-    factory.initialize(this);
-    return factory;
+  public boolean useOutgoingMessageCombiner() {
+    return classes.getOutgoingMessageClasses().useMessageCombiner();
+  }
+
+  /**
+   * Get outgoing message encode and store type
+   * @return outgoing message encode and store type
+   */
+  public MessageEncodeAndStoreType getOutgoingMessageEncodeAndStoreType() {
+    return classes.getOutgoingMessageClasses().getMessageEncodeAndStoreType();
   }
 
   @Override
@@ -1227,20 +1238,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    * @param superstepClasses SuperstepClasses
    */
   public void updateSuperstepClasses(SuperstepClasses superstepClasses) {
-    Class<? extends Computation> computationClass =
-        superstepClasses.getComputationClass();
-    classes.setComputationClass(computationClass);
-    Class<? extends Writable> incomingMsgValueClass =
-        superstepClasses.getIncomingMessageClass();
-    if (incomingMsgValueClass != null) {
-      classes.setIncomingMessageValueClass(incomingMsgValueClass);
-    }
-    Class<? extends Writable> outgoingMsgValueClass =
-        superstepClasses.getOutgoingMessageClass();
-    if (outgoingMsgValueClass != null) {
-      classes.setOutgoingMessageValueClass(outgoingMsgValueClass);
-    }
-    classes.setMessageCombiner(superstepClasses.getMessageCombinerClass());
+    superstepClasses.updateGiraphClasses(classes);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/MessageClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/MessageClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/MessageClasses.java
new file mode 100644
index 0000000..b2a09c7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/MessageClasses.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for containing all items that define message being sent,
+ * including it's value factory and combiner.
+ *
+ * @param <I>
+ * @param <M>
+ */
+public interface MessageClasses
+    <I extends WritableComparable, M extends Writable> extends Writable {
+  /**
+   * Get message class
+   * @return message class
+   */
+  Class<M> getMessageClass();
+
+  /**
+   * Create new instance of MessageValueFactory
+   * @param conf Configuration
+   * @return message value factory
+   */
+  MessageValueFactory<M> createMessageValueFactory(
+      ImmutableClassesGiraphConfiguration conf);
+
+  /**
+   * Create new instance of MessageCombiner
+   * @param conf Configuration
+   * @return message combiner
+   */
+  MessageCombiner<? super I, M> createMessageCombiner(
+      ImmutableClassesGiraphConfiguration
+        <I, ? extends Writable, ? extends Writable> conf);
+
+  /**
+   * Has message combiner been specified
+   * @return has message combiner been specified
+   */
+  boolean useMessageCombiner();
+
+  /**
+   * Get MessageEncodeAndStoreType
+   * @return message encode and store type
+   */
+  MessageEncodeAndStoreType getMessageEncodeAndStoreType();
+
+  /**
+   * Creates a fresh copy of this object,
+   * to be used and changed for new superstep.
+   * (that should be independent from the previous one)
+   *
+   * @return message classes
+   */
+  MessageClasses<I, M> createCopyForNewSuperstep();
+
+  /**
+   * Verify if types are internally consistent
+   *
+   * @param conf Configuration
+   */
+  void verifyConsistent(ImmutableClassesGiraphConfiguration conf);
+}


Mime
View raw message