giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edu...@apache.org
Subject [2/3] git commit: updated refs/heads/trunk to 5d0b81a
Date Fri, 10 Apr 2015 19:20:28 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
index 375d054..14f889d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
@@ -30,8 +30,6 @@ public class PerGraphTypeBoolean {
   private boolean vertexValue;
   /** data for edge value */
   private boolean edgeValue;
-  /** data for incoming message */
-  private boolean incomingMessage;
   /** data for outgoing message */
   private boolean outgoingMessage;
 
@@ -60,7 +58,6 @@ public class PerGraphTypeBoolean {
     setVertexId(options.getVertexId(), conf);
     setVertexValue(options.getVertexValue(), conf);
     setEdgeValue(options.getEdgeValue(), conf);
-    setIncomingMessage(options.getIncomingMessage(), conf);
     setOutgoingMessage(options.getOutgoingMessage(), conf);
   }
 
@@ -95,16 +92,6 @@ public class PerGraphTypeBoolean {
   }
 
   /**
-   * Set the incoming message value data from the option
-   *
-   * @param option EnumConfOption option to use
-   * @param conf Configuration
-   */
-  public void setIncomingMessage(BooleanConfOption option, Configuration conf) {
-    incomingMessage = option.get(conf);
-  }
-
-  /**
    * Set the outgoing message value data from the option
    *
    * @param option EnumConfOption option to use
@@ -128,8 +115,6 @@ public class PerGraphTypeBoolean {
       return vertexValue;
     case EDGE_VALUE:
       return edgeValue;
-    case INCOMING_MESSAGE_VALUE:
-      return incomingMessage;
     case OUTGOING_MESSAGE_VALUE:
       return outgoingMessage;
     default:
@@ -142,10 +127,6 @@ public class PerGraphTypeBoolean {
     return edgeValue;
   }
 
-  public boolean getIncomingMessage() {
-    return incomingMessage;
-  }
-
   public boolean getOutgoingMessage() {
     return outgoingMessage;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
index adfa979..d622546 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
@@ -30,8 +30,6 @@ public class PerGraphTypeBooleanConfOption {
   private final BooleanConfOption vertexValue;
   /** option for edge value */
   private final BooleanConfOption edgeValue;
-  /** option for incoming message */
-  private final BooleanConfOption incomingMessage;
   /** option for outgoing message */
   private final BooleanConfOption outgoingMessage;
 
@@ -50,8 +48,6 @@ public class PerGraphTypeBooleanConfOption {
         defaultValue, description);
     edgeValue = new BooleanConfOption(keyPrefix + ".edge.value",
         defaultValue, description);
-    incomingMessage = new BooleanConfOption(keyPrefix + ".incoming.message",
-        defaultValue, description);
     outgoingMessage = new BooleanConfOption(keyPrefix + ".outgoing.message",
         defaultValue, description);
   }
@@ -70,8 +66,6 @@ public class PerGraphTypeBooleanConfOption {
       return vertexValue;
     case EDGE_VALUE:
       return edgeValue;
-    case INCOMING_MESSAGE_VALUE:
-      return incomingMessage;
     case OUTGOING_MESSAGE_VALUE:
       return outgoingMessage;
     default:
@@ -95,10 +89,6 @@ public class PerGraphTypeBooleanConfOption {
     return edgeValue;
   }
 
-  public BooleanConfOption getIncomingMessage() {
-    return incomingMessage;
-  }
-
   public BooleanConfOption getOutgoingMessage() {
     return outgoingMessage;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
index 7003709..199b2f9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
@@ -32,8 +32,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
   private T vertexValue;
   /** data for edge value */
   private T edgeValue;
-  /** data for incoming message */
-  private T incomingMessage;
   /** data for outgoing message */
   private T outgoingMessage;
 
@@ -63,7 +61,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
     setVertexId(options.getVertexId(), conf);
     setVertexValue(options.getVertexValue(), conf);
     setEdgeValue(options.getEdgeValue(), conf);
-    setIncomingMessage(options.getIncomingMessage(), conf);
     setOutgoingMessage(options.getOutgoingMessage(), conf);
   }
 
@@ -98,16 +95,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
   }
 
   /**
-   * Set the incoming message value data from the option
-   *
-   * @param option EnumConfOption option to use
-   * @param conf Configuration
-   */
-  public void setIncomingMessage(EnumConfOption<T> option, Configuration conf) {
-    incomingMessage = option.get(conf);
-  }
-
-  /**
    * Set the outgoing message value data from the option
    *
    * @param option EnumConfOption option to use
@@ -131,8 +118,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
       return vertexValue;
     case EDGE_VALUE:
       return edgeValue;
-    case INCOMING_MESSAGE_VALUE:
-      return incomingMessage;
     case OUTGOING_MESSAGE_VALUE:
       return outgoingMessage;
     default:
@@ -145,10 +130,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
     return edgeValue;
   }
 
-  public T getIncomingMessage() {
-    return incomingMessage;
-  }
-
   public T getOutgoingMessage() {
     return outgoingMessage;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
index 8ae4576..c5041e2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
@@ -32,8 +32,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
   private final EnumConfOption<T> vertexValue;
   /** option for edge value */
   private final EnumConfOption<T> edgeValue;
-  /** option for incoming message */
-  private final EnumConfOption<T> incomingMessage;
   /** option for outgoing message */
   private final EnumConfOption<T> outgoingMessage;
 
@@ -53,8 +51,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
         defaultValue, description);
     edgeValue = EnumConfOption.create(keyPrefix + ".edge.value",
         klass, defaultValue, description);
-    incomingMessage = EnumConfOption.create(keyPrefix + ".incoming.message",
-        klass, defaultValue, description);
     outgoingMessage = EnumConfOption.create(keyPrefix + ".outgoing.message",
         klass, defaultValue, description);
   }
@@ -89,8 +85,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
       return vertexValue;
     case EDGE_VALUE:
       return edgeValue;
-    case INCOMING_MESSAGE_VALUE:
-      return incomingMessage;
     case OUTGOING_MESSAGE_VALUE:
       return outgoingMessage;
     default:
@@ -114,10 +108,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
     return edgeValue;
   }
 
-  public EnumConfOption<T> getIncomingMessage() {
-    return incomingMessage;
-  }
-
   public EnumConfOption<T> getOutgoingMessage() {
     return outgoingMessage;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
deleted file mode 100644
index 5551439..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.factories;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.base.Objects;
-
-/**
- * Factory class to create default message values.
- *
- * @param <M> Message Value
- */
-public abstract class AbstractMessageValueFactory<M extends Writable>
-    implements MessageValueFactory<M> {
-  /** Message value class */
-  private Class<M> messageValueClass;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration conf;
-
-  /**
-   * Get the message value class from the configuration
-   *
-   * @param conf Configuration
-   * @return message value Class
-   */
-  protected abstract Class<M> extractMessageValueClass(
-      ImmutableClassesGiraphConfiguration conf);
-
-  @Override
-  public Class<M> getValueClass() {
-    return messageValueClass;
-  }
-
-  @Override
-  public void initialize(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-    messageValueClass = extractMessageValueClass(conf);
-  }
-
-  @Override public M newInstance() {
-    return WritableUtils.createWritable(messageValueClass, conf);
-  }
-
-  @Override public String toString() {
-    return Objects.toStringHelper(this)
-        .add("messageValueClass", messageValueClass)
-        .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
index 998c06f..a838e0f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.giraph.factories;
 
+import org.apache.giraph.conf.GiraphConfigurationSettable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
@@ -29,21 +30,18 @@ import org.apache.hadoop.io.Writable;
  * @param <E> Edge Value
  */
 public class DefaultEdgeValueFactory<E extends Writable>
-    implements EdgeValueFactory<E> {
+    implements EdgeValueFactory<E>, GiraphConfigurationSettable {
   /** Cached edge value class. */
   private Class<E> edgeValueClass;
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
 
-  @Override public void initialize(ImmutableClassesGiraphConfiguration conf) {
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
     edgeValueClass = conf.getEdgeValueClass();
   }
 
-  @Override public Class<E> getValueClass() {
-    return edgeValueClass;
-  }
-
   @Override public E newInstance() {
     return WritableUtils.createWritable(edgeValueClass, conf);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java
deleted file mode 100644
index bdf2966..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.factories;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Factory class to create default incoming message values.
- *
- * @param <M> Incoming Message Value
- */
-public class DefaultIncomingMessageValueFactory<M extends Writable> extends
-    AbstractMessageValueFactory<M> {
-  @Override protected Class<M> extractMessageValueClass(
-      ImmutableClassesGiraphConfiguration conf) {
-    return conf.getIncomingMessageValueClass();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java
new file mode 100644
index 0000000..b10d30c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.factories;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Objects;
+
+/**
+ * Factory class to create default message values.
+ *
+ * @param <M> Message Value
+ */
+public class DefaultMessageValueFactory<M extends Writable>
+    implements MessageValueFactory<M> {
+  /** Message value class */
+  private final Class<M> messageValueClass;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+
+  /**
+   * Constructor
+   * @param messageValueClass message value class
+   * @param conf configuration
+   */
+  public DefaultMessageValueFactory(Class<M> messageValueClass,
+      ImmutableClassesGiraphConfiguration conf) {
+    this.messageValueClass = messageValueClass;
+    this.conf = conf;
+  }
+
+  @Override public M newInstance() {
+    return WritableUtils.createWritable(messageValueClass, conf);
+  }
+
+  @Override public String toString() {
+    return Objects.toStringHelper(this)
+        .add("messageValueClass", messageValueClass)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java
deleted file mode 100644
index a42d5e2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.factories;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Factory class to create default outgoing message values.
- *
- * @param <M> Outgoing Message Value
- */
-public class DefaultOutgoingMessageValueFactory<M extends Writable> extends
-    AbstractMessageValueFactory<M> {
-  @Override protected Class<M> extractMessageValueClass(
-      ImmutableClassesGiraphConfiguration conf) {
-    return conf.getOutgoingMessageValueClass();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
index 305548c..d36ae57 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.giraph.factories;
 
+import org.apache.giraph.conf.GiraphConfigurationSettable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.WritableComparable;
@@ -28,24 +29,19 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex ID
  */
 public class DefaultVertexIdFactory<I extends WritableComparable>
-    implements VertexIdFactory<I> {
+    implements VertexIdFactory<I>, GiraphConfigurationSettable {
   /** Cached vertex value class. */
   private Class<I> vertexIdClass;
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
 
   @Override
-  public void initialize(ImmutableClassesGiraphConfiguration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
     vertexIdClass = conf.getVertexIdClass();
   }
 
   @Override
-  public Class<I> getValueClass() {
-    return vertexIdClass;
-  }
-
-  @Override
   public I newInstance() {
     return WritableUtils.createWritable(vertexIdClass, conf);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
index 634f0d5..2cc124c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.factories;
 
+import org.apache.giraph.conf.GiraphConfigurationSettable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
@@ -29,24 +30,19 @@ import org.apache.hadoop.io.Writable;
  * @param <V> Vertex value
  */
 public class DefaultVertexValueFactory<V extends Writable>
-    implements VertexValueFactory<V> {
+    implements VertexValueFactory<V>, GiraphConfigurationSettable {
   /** Cached vertex value class. */
   private Class<V> vertexValueClass;
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
 
   @Override
-  public void initialize(ImmutableClassesGiraphConfiguration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
     vertexValueClass = conf.getVertexValueClass();
   }
 
   @Override
-  public Class<V> getValueClass() {
-    return vertexValueClass;
-  }
-
-  @Override
   public V newInstance() {
     return WritableUtils.createWritable(vertexValueClass, conf);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
index 806664b..e8c1c6d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
@@ -17,7 +17,6 @@
  */
 package org.apache.giraph.factories;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 
@@ -40,13 +39,6 @@ public class TestMessageValueFactory<M extends Writable>
     this.klass = klass;
   }
 
-  @Override public Class<M> getValueClass() {
-    return klass;
-  }
-
-  @Override public void initialize(
-      ImmutableClassesGiraphConfiguration conf) { }
-
   @Override public M newInstance() {
     return ReflectionUtils.newInstance(klass);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
index 733d5f2..00f35f3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
@@ -17,17 +17,14 @@
  */
 package org.apache.giraph.factories;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_FACTORY_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_FACTORY_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_FACTORY_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_FACTORY_CLASS;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
 /**
  * Holder for factories to create user types.
  *
@@ -47,12 +44,6 @@ public class ValueFactories<I extends WritableComparable,
   private final VertexValueFactory<V> vertexValueFactory;
   /** Edge value factory. */
   private final EdgeValueFactory<E> edgeValueFactory;
-  // Note that for messages we store the class not the factory itself, because
-  // the factory instance may change per-superstep if the graph types change.
-  /** Incoming message value factory class */
-  private final Class<? extends MessageValueFactory> inMsgFactoryClass;
-  /** Outgoing message value factory class */
-  private final Class<? extends MessageValueFactory> outMsgFactoryClass;
 
   /**
    * Constructor reading from Configuration
@@ -63,19 +54,6 @@ public class ValueFactories<I extends WritableComparable,
     vertexIdFactory = VERTEX_ID_FACTORY_CLASS.newInstance(conf);
     vertexValueFactory = VERTEX_VALUE_FACTORY_CLASS.newInstance(conf);
     edgeValueFactory = EDGE_VALUE_FACTORY_CLASS.newInstance(conf);
-    inMsgFactoryClass = INCOMING_MESSAGE_VALUE_FACTORY_CLASS.get(conf);
-    outMsgFactoryClass = OUTGOING_MESSAGE_VALUE_FACTORY_CLASS.get(conf);
-  }
-
-  /**
-   * Initialize all of the factories.
-   *
-   * @param conf ImmutableClassesGiraphConfiguration
-   */
-  public void initializeIVE(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
-    vertexIdFactory.initialize(conf);
-    vertexValueFactory.initialize(conf);
-    edgeValueFactory.initialize(conf);
   }
 
   public EdgeValueFactory<E> getEdgeValueFactory() {
@@ -89,12 +67,4 @@ public class ValueFactories<I extends WritableComparable,
   public VertexValueFactory<V> getVertexValueFactory() {
     return vertexValueFactory;
   }
-
-  public Class<? extends MessageValueFactory> getInMsgFactoryClass() {
-    return inMsgFactoryClass;
-  }
-
-  public Class<? extends MessageValueFactory> getOutMsgFactoryClass() {
-    return outMsgFactoryClass;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
index 5725061..de56929 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
@@ -17,7 +17,8 @@
  */
 package org.apache.giraph.factories;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.io.Serializable;
+
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -25,30 +26,11 @@ import org.apache.hadoop.io.Writable;
  *
  * @param <W> Writable type
  */
-public interface ValueFactory<W extends Writable> {
-  /**
-   * Initialize factory settings from the conf.
-   * This gets called on startup and also if there are changes to the message
-   * classes used. For example if the user's
-   * {@link org.apache.giraph.master.MasterCompute} changes the
-   * {@link org.apache.giraph.graph.Computation} and the next superstep has a
-   * different message value type.
-   *
-   * @param conf Configuration
-   */
-  void initialize(ImmutableClassesGiraphConfiguration conf);
-
+public interface ValueFactory<W extends Writable> extends Serializable {
   /**
    * Create a new value.
    *
    * @return new value.
    */
   W newInstance();
-
-  /**
-   * Get the java Class representing messages this factory creates
-   *
-   * @return Class<M>
-   */
-  Class<W> getValueClass();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 996159f..226087c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -17,6 +17,12 @@
  */
 package org.apache.giraph.graph;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.messages.MessageStore;
@@ -47,12 +53,6 @@ import com.google.common.collect.Lists;
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Histogram;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-
 /**
  * Compute as many vertex partitions as possible.  Every thread will has its
  * own instance of WorkerClientRequestProcessor to send requests.  Note that
@@ -139,7 +139,9 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
     // Thread initialization (for locality)
     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E>(
-            context, configuration, serviceWorker);
+            context, configuration, serviceWorker,
+            configuration.getOutgoingMessageEncodeAndStoreType().
+              useOneMessageToManyIdsEncoding());
     WorkerThreadGlobalCommUsage aggregatorUsage =
         serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
     WorkerContext workerContext = serviceWorker.getWorkerContext();

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
index 4a0ac8f..b02159e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
@@ -90,27 +90,6 @@ public enum GraphType {
       return conf.getEdgeValueFactory();
     }
   },
-  /** Incoming message value */
-  INCOMING_MESSAGE_VALUE {
-    @Override
-    public ClassConfOption<? extends Writable> writableConfOption() {
-      return GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
-    }
-    @Override
-    public ClassConfOption<? extends ValueFactory> factoryClassOption() {
-      return GiraphConstants.INCOMING_MESSAGE_VALUE_FACTORY_CLASS;
-    }
-    @Override
-    public <T extends Writable> Class<T> get(
-        ImmutableClassesGiraphConfiguration conf) {
-      return conf.getIncomingMessageValueClass();
-    }
-    @Override
-    public <T extends Writable> ValueFactory<T> factory(
-        ImmutableClassesGiraphConfiguration conf) {
-      return conf.getIncomingMessageValueFactory();
-    }
-  },
   /** Outgoing message value */
   OUTGOING_MESSAGE_VALUE {
     @Override
@@ -129,7 +108,7 @@ public enum GraphType {
     @Override
     public <T extends Writable> ValueFactory<T> factory(
         ImmutableClassesGiraphConfiguration conf) {
-      return conf.getOutgoingMessageValueFactory();
+      return conf.createOutgoingMessageValueFactory();
     }
   };
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 73a7aab..38f14d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -18,6 +18,10 @@
 
 package org.apache.giraph.job;
 
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -25,8 +29,8 @@ import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.DefaultVertexValueFactory;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.DefaultVertexResolver;
-import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -36,10 +40,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
-import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
-
 /**
  * GiraphConfigurationValidator attempts to verify the consistency of
  * user-chosen InputFormat, OutputFormat, and Vertex generic type
@@ -81,7 +81,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   /**
    * The Configuration object for use in the validation test.
    */
-  private ImmutableClassesGiraphConfiguration conf;
+  private final ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Constructor to execute the validation test, throws
@@ -126,7 +126,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
    * @return outgoing message value type
    */
   private Class<? extends Writable> outgoingMessageValueType() {
-    return conf.getGiraphTypes().getOutgoingMessageValueClass();
+    return conf.getOutgoingMessageValueClass();
   }
 
   /**
@@ -266,11 +266,11 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
    * generic params match the job.
    */
   private void verifyMessageCombinerGenericTypes() {
-    Class<? extends MessageCombiner<I, M2>> messageCombinerClass =
-      conf.getMessageCombinerClass();
-    if (messageCombinerClass != null) {
+    MessageCombiner<I, M2> messageCombiner =
+      conf.createOutgoingMessageCombiner();
+    if (messageCombiner != null) {
       Class<?>[] classList =
-          getTypeArguments(MessageCombiner.class, messageCombinerClass);
+          getTypeArguments(MessageCombiner.class, messageCombiner.getClass());
       checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
           MessageCombiner.class, "vertex index");
       checkEquals(classList, MSG_COMBINER_PARAM_INDEX,
@@ -356,7 +356,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     if (classList[index] == null) {
       LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
     } else if (!classList[index].equals(classFromComputation)) {
-      throw new IllegalArgumentException(
+      throw new IllegalStateException(
           "checkClassTypes: " + typeName + " types not equal, " +
               "computation - " + classFromComputation +
               ", " + klass.getSimpleName() + " - " +
@@ -378,7 +378,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     if (classList[index] == null) {
       LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
     } else if (!classList[index].isAssignableFrom(classFromComputation)) {
-      throw new IllegalArgumentException(
+      throw new IllegalStateException(
           "checkClassTypes: " + typeName + " types not assignable, " +
               "computation - " + classFromComputation +
               ", " + klass.getSimpleName() + " - " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
index 3266d9b..1eff92b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
@@ -86,10 +86,6 @@ public class JythonOptions {
   public static final JythonGraphTypeOptions JYTHON_EDGE_VALUE =
       new JythonGraphTypeOptions(GraphType.EDGE_VALUE);
 
-  /** incoming message value options */
-  public static final JythonGraphTypeOptions JYTHON_IN_MSG_VALUE =
-      new JythonGraphTypeOptions(GraphType.INCOMING_MESSAGE_VALUE);
-
   /** outgonig message value options */
   public static final JythonGraphTypeOptions JYTHON_OUT_MSG_VALUE =
       new JythonGraphTypeOptions(GraphType.OUTGOING_MESSAGE_VALUE);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
index 0d3d34a..c0b286f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
@@ -18,6 +18,7 @@
 package org.apache.giraph.jython.factories;
 
 import org.apache.giraph.conf.ClassConfOption;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.StrConfOption;
 import org.apache.giraph.factories.ValueFactory;
@@ -37,7 +38,7 @@ import org.python.core.PyObject;
  * @param <W> writable type
  */
 public abstract class JythonFactoryBase<W extends Writable>
-    implements ValueFactory<W> {
+    implements ValueFactory<W>, GiraphConfigurationSettable {
   /** Logger */
   private static final Logger LOG = Logger.getLogger(JythonFactoryBase.class);
 
@@ -84,7 +85,7 @@ public abstract class JythonFactoryBase<W extends Writable>
   }
 
   @Override
-  public void initialize(
+  public void setConf(
       ImmutableClassesGiraphConfiguration conf) {
     jythonClassName = jythonClassNameOption().get(conf);
     useWrapper = conf.getValueNeedsWrappers().get(getGraphType());
@@ -105,14 +106,6 @@ public abstract class JythonFactoryBase<W extends Writable>
     }
   }
 
-  @Override public Class<W> getValueClass() {
-    if (useWrapper) {
-      return (Class<W>) JythonWritableWrapper.class;
-    } else {
-      return (Class<W>) writableValueClass();
-    }
-  }
-
   /**
    * Use this factory in the {@link org.apache.hadoop.conf.Configuration}
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java
deleted file mode 100644
index e77e9c3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.jython.factories;
-
-import org.apache.giraph.jython.JythonOptions;
-import org.apache.hadoop.io.Writable;
-
-/**
- * {@link MessageValueFactory} that creates incoming message values which are
- * Jython classes.
- *
- * @param <M> Incoming Message Value
- */
-public class JythonIncomingMessageValueFactory<M extends Writable>
-    extends JythonMessageValueFactory<M> {
-  @Override
-  public JythonOptions.JythonGraphTypeOptions getOptions() {
-    return JythonOptions.JYTHON_IN_MSG_VALUE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
index d2f8d9f..f798f51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
@@ -32,9 +32,4 @@ public abstract class JythonMessageValueFactory<M extends Writable>
   public M newInstance() {
     return (M) newJythonClassInstance();
   }
-
-  @Override
-  public Class<M> getValueClass() {
-    return (Class<M>) writableValueClass();
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index e942157..0b56a4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -50,8 +50,8 @@ import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.MasterServer;
@@ -197,7 +197,7 @@ public class BspServiceMaster<I extends WritableComparable,
   /** Current checkpoint status */
   private CheckpointStatus checkpointStatus;
   /** Checks if checkpointing supported */
-  private CheckpointSupportedChecker checkpointSupportedChecker;
+  private final CheckpointSupportedChecker checkpointSupportedChecker;
 
   /**
    * Constructor for setting up the master.
@@ -803,7 +803,8 @@ public class BspServiceMaster<I extends WritableComparable,
     GlobalStats globalStats = new GlobalStats();
     globalStats.readFields(finalizedStream);
     updateCounters(globalStats);
-    SuperstepClasses superstepClasses = new SuperstepClasses();
+    SuperstepClasses superstepClasses =
+        SuperstepClasses.createToRead(getConfiguration());
     superstepClasses.readFields(finalizedStream);
     getConfiguration().updateSuperstepClasses(superstepClasses);
     int prefixFileCount = finalizedStream.readInt();
@@ -1576,7 +1577,7 @@ public class BspServiceMaster<I extends WritableComparable,
         GiraphStats.getInstance().getEdges().getValue(),
         getContext());
     SuperstepClasses superstepClasses =
-      new SuperstepClasses(getConfiguration());
+        SuperstepClasses.createAndExtractTypes(getConfiguration());
     masterCompute.setGraphState(graphState);
     masterCompute.setSuperstepClasses(superstepClasses);
     return superstepClasses;
@@ -1732,8 +1733,7 @@ public class BspServiceMaster<I extends WritableComparable,
     // match) and if the computation is halted, no need to check any of
     // the types.
     if (!globalStats.getHaltComputation()) {
-      superstepClasses.verifyTypesMatch(
-          getConfiguration(), getSuperstep() != 0);
+      superstepClasses.verifyTypesMatch(getSuperstep() > 0);
     }
     getConfiguration().updateSuperstepClasses(superstepClasses);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 85496c2..50e3b36 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -22,10 +22,12 @@ import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.reducers.ReduceOperation;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
 /**
@@ -173,8 +175,10 @@ public abstract class MasterCompute
 
   /**
    * Set incoming message class to be used
+   *
    * @param incomingMessageClass incoming message class
    */
+  @Deprecated
   public final void setIncomingMessage(
       Class<? extends Writable> incomingMessageClass) {
     superstepClasses.setIncomingMessageClass(incomingMessageClass);
@@ -182,6 +186,7 @@ public abstract class MasterCompute
 
   /**
    * Set outgoing message class to be used
+   *
    * @param outgoingMessageClass outgoing message class
    */
   public final void setOutgoingMessage(
@@ -189,6 +194,17 @@ public abstract class MasterCompute
     superstepClasses.setOutgoingMessageClass(outgoingMessageClass);
   }
 
+  /**
+   * Set outgoing message classes to be used
+   *
+   * @param outgoingMessageClasses outgoing message classes
+   */
+  public void setOutgoingMessageClasses(
+      MessageClasses<? extends WritableComparable, ? extends Writable>
+        outgoingMessageClasses) {
+    superstepClasses.setOutgoingMessageClasses(outgoingMessageClasses);
+  }
+
   @Override
   public final <S, R extends Writable> void registerReducer(
       String name, ReduceOperation<S, R> reduceOp) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
index 8145109..8653c96 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -23,17 +23,21 @@ import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.lang.reflect.Modifier;
 
 import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.conf.DefaultMessageClasses;
+import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.conf.TypesHolder;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.Language;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
+import org.python.google.common.base.Preconditions;
 
 /**
  * Holds Computation and MessageCombiner class.
@@ -41,115 +45,170 @@ import org.apache.log4j.Logger;
 public class SuperstepClasses implements Writable {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(SuperstepClasses.class);
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
 
   /** Computation class to be used in the following superstep */
   private Class<? extends Computation> computationClass;
-  /** MessageCombiner class to be used in the following superstep */
-  private Class<? extends MessageCombiner> messageCombinerClass;
-  /** Incoming message class to be used in the following superstep */
-  private Class<? extends Writable> incomingMessageClass;
-  /** Outgoing message class to be used in the following superstep */
-  private Class<? extends Writable> outgoingMessageClass;
+  /** Incoming message classes, immutable, only here for cheecking */
+  private MessageClasses<? extends WritableComparable, ? extends Writable>
+  incomingMessageClasses;
+  /** Outgoing message classes */
+  private MessageClasses<? extends WritableComparable, ? extends Writable>
+  outgoingMessageClasses;
 
   /**
-   * Default constructor
+   * Constructor
+   * @param conf Configuration
+   * @param computationClass computation class
+   * @param incomingMessageClasses incoming message classes
+   * @param outgoingMessageClasses outgoing message classes
    */
-  public SuperstepClasses() {
+  public SuperstepClasses(
+      ImmutableClassesGiraphConfiguration conf,
+      Class<? extends Computation> computationClass,
+      MessageClasses<? extends WritableComparable, ? extends Writable>
+        incomingMessageClasses,
+      MessageClasses<? extends WritableComparable, ? extends Writable>
+        outgoingMessageClasses) {
+    this.conf = conf;
+    this.computationClass = computationClass;
+    this.incomingMessageClasses = incomingMessageClasses;
+    this.outgoingMessageClasses = outgoingMessageClasses;
   }
 
   /**
-   * Constructor
-   *
+   * Create empty superstep classes, readFields needs to be called afterwards
    * @param conf Configuration
+   * @return Superstep classes
    */
-  @SuppressWarnings("unchecked")
-  public SuperstepClasses(ImmutableClassesGiraphConfiguration conf) {
-    this(conf.getComputationClass(), conf.getMessageCombinerClass());
+  public static SuperstepClasses createToRead(
+      ImmutableClassesGiraphConfiguration conf) {
+    return new SuperstepClasses(conf, null, null, null);
   }
 
   /**
-   * Constructor
-   *
-   * @param computationClass Computation class
-   * @param messageCombinerClass MessageCombiner class
+   * Create superstep classes by initiazling from current state
+   * in configuration
+   * @param conf Configuration
+   * @return Superstep classes
    */
-  public SuperstepClasses(Class<? extends Computation> computationClass,
-      Class<? extends MessageCombiner> messageCombinerClass) {
-    this.computationClass = computationClass;
-    this.messageCombinerClass =
-        messageCombinerClass;
+  public static SuperstepClasses createAndExtractTypes(
+      ImmutableClassesGiraphConfiguration conf) {
+    return new SuperstepClasses(
+        conf,
+        conf.getComputationClass(),
+        conf.getOutgoingMessageClasses(),
+        conf.getOutgoingMessageClasses().createCopyForNewSuperstep());
   }
 
   public Class<? extends Computation> getComputationClass() {
     return computationClass;
   }
 
-  public Class<? extends MessageCombiner> getMessageCombinerClass() {
-    return messageCombinerClass;
+  public MessageClasses<? extends WritableComparable, ? extends Writable>
+  getOutgoingMessageClasses() {
+    return outgoingMessageClasses;
   }
 
   /**
-   * Get incoming message class, either set directly, or through Computation
-   * @return incoming message class
+   * Set's outgoing MessageClasses for next superstep.
+   * Should not be used together with
+   * setMessageCombinerClass/setOutgoingMessageClass methods.
+   *
+   * @param outgoingMessageClasses outgoing message classes
    */
-  public Class<? extends Writable> getIncomingMessageClass() {
-    if (incomingMessageClass != null) {
-      return incomingMessageClass;
-    }
-    if (computationClass == null) {
-      return null;
-    }
-    Class[] computationTypes = ReflectionUtils.getTypeArguments(
-        TypesHolder.class, computationClass);
-    return computationTypes[3];
+  public void setOutgoingMessageClasses(
+      MessageClasses<? extends WritableComparable, ? extends Writable>
+        outgoingMessageClasses) {
+    this.outgoingMessageClasses = outgoingMessageClasses;
   }
 
   /**
-   * Get outgoing message class, either set directly, or through Computation
-   * @return outgoing message class
+   * Set computation class
+   * @param computationClass computation class
    */
-  public Class<? extends Writable> getOutgoingMessageClass() {
-    if (outgoingMessageClass != null) {
-      return outgoingMessageClass;
-    }
-    if (computationClass == null) {
-      return null;
-    }
-    Class[] computationTypes = ReflectionUtils.getTypeArguments(
-        TypesHolder.class, computationClass);
-    return computationTypes[4];
-  }
-
   public void setComputationClass(
       Class<? extends Computation> computationClass) {
     this.computationClass = computationClass;
+
+    if (computationClass != null) {
+      Class[] computationTypes = ReflectionUtils.getTypeArguments(
+          TypesHolder.class, computationClass);
+      if (computationTypes[4] != null &&
+          outgoingMessageClasses instanceof DefaultMessageClasses) {
+        ((DefaultMessageClasses) outgoingMessageClasses)
+          .setIfNotModifiedMessageClass(computationTypes[4]);
+      }
+    }
   }
 
+  /**
+   * Set message combiner class.
+   * Should not be used together setOutgoingMessageClasses
+   * (throws exception if called with it),
+   * as it is unnecessary to do so.
+   *
+   * @param messageCombinerClass message combiner class
+   */
   public void setMessageCombinerClass(
       Class<? extends MessageCombiner> messageCombinerClass) {
-    this.messageCombinerClass = messageCombinerClass;
+    Preconditions.checkState(
+        outgoingMessageClasses instanceof DefaultMessageClasses);
+    ((DefaultMessageClasses) outgoingMessageClasses).
+        setMessageCombinerClass(messageCombinerClass);
   }
 
+  /**
+   * Set incoming message class
+   * @param incomingMessageClass incoming message class
+   */
+  @Deprecated
   public void setIncomingMessageClass(
       Class<? extends Writable> incomingMessageClass) {
-    this.incomingMessageClass = incomingMessageClass;
+    if (!incomingMessageClasses.getMessageClass().
+        equals(incomingMessageClass)) {
+      throw new IllegalArgumentException(
+          "Cannot change incoming message class from " +
+          incomingMessageClasses.getMessageClass() +
+          " previously, to " + incomingMessageClass);
+    }
   }
 
+  /**
+   * Set outgoing message class.
+   * Should not be used together setOutgoingMessageClasses
+   * (throws exception if called with it),
+   * as it is unnecessary to do so.
+   *
+   * @param outgoingMessageClass outgoing message class
+   */
   public void setOutgoingMessageClass(
       Class<? extends Writable> outgoingMessageClass) {
-    this.outgoingMessageClass = outgoingMessageClass;
+    Preconditions.checkState(
+        outgoingMessageClasses instanceof DefaultMessageClasses);
+    ((DefaultMessageClasses) outgoingMessageClasses).
+        setMessageClass(outgoingMessageClass);
+  }
+
+  /**
+   * Get message combiner class
+   * @return message combiner class
+   */
+  public Class<? extends MessageCombiner> getMessageCombinerClass() {
+    MessageCombiner combiner =
+        outgoingMessageClasses.createMessageCombiner(conf);
+    return combiner != null ? combiner.getClass() : null;
   }
 
   /**
    * Verify that types of current Computation and MessageCombiner are valid.
    * If types don't match an {@link IllegalStateException} will be thrown.
    *
-   * @param conf Configuration to verify this with
    * @param checkMatchingMesssageTypes Check that the incoming/outgoing
    *                                   message types match
    */
-  public void verifyTypesMatch(ImmutableClassesGiraphConfiguration conf,
-                               boolean checkMatchingMesssageTypes) {
+  public void verifyTypesMatch(boolean checkMatchingMesssageTypes) {
     // In some cases, for example when using Jython, the Computation class may
     // not be set. This is because it is created by a ComputationFactory
     // dynamically and not known ahead of time. In this case there is nothing to
@@ -160,91 +219,55 @@ public class SuperstepClasses implements Writable {
 
     Class<?>[] computationTypes = ReflectionUtils.getTypeArguments(
         TypesHolder.class, computationClass);
-    verifyTypes(conf.getVertexIdClass(), computationTypes[0],
+    ReflectionUtils.verifyTypes(conf.getVertexIdClass(), computationTypes[0],
         "Vertex id", computationClass);
-    verifyTypes(conf.getVertexValueClass(), computationTypes[1],
+    ReflectionUtils.verifyTypes(conf.getVertexValueClass(), computationTypes[1],
         "Vertex value", computationClass);
-    verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
+    ReflectionUtils.verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
         "Edge value", computationClass);
 
-    Class<?> incomingMessageType = getIncomingMessageClass();
-    Class<?> outgoingMessageType = getOutgoingMessageClass();
-
     if (checkMatchingMesssageTypes) {
-      verifyTypes(incomingMessageType, conf.getOutgoingMessageValueClass(),
-          "New incoming and previous outgoing message", computationClass);
-    }
-    if (outgoingMessageType.isInterface()) {
-      throw new IllegalStateException("verifyTypesMatch: " +
-          "Message type must be concrete class " + outgoingMessageType);
-    }
-    if (Modifier.isAbstract(outgoingMessageType.getModifiers())) {
-      throw new IllegalStateException("verifyTypesMatch: " +
-          "Message type can't be abstract class" + outgoingMessageType);
-    }
-    if (messageCombinerClass != null) {
-      Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
-          MessageCombiner.class, messageCombinerClass);
-      verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
-          "Vertex id", messageCombinerClass);
-      verifyTypes(outgoingMessageType, combinerTypes[1],
-          "Outgoing message", messageCombinerClass);
+      ReflectionUtils.verifyTypes(incomingMessageClasses.getMessageClass(),
+          computationTypes[3], "Incoming message type", computationClass);
     }
+
+    ReflectionUtils.verifyTypes(outgoingMessageClasses.getMessageClass(),
+        computationTypes[4], "Outgoing message type", computationClass);
+
+    outgoingMessageClasses.verifyConsistent(conf);
   }
 
   /**
-   * Verify that found type matches the expected type. If types don't match an
-   * {@link IllegalStateException} will be thrown.
-   *
-   * @param expected Expected type
-   * @param actual Actual type
-   * @param typeDesc String description of the type (for exception description)
-   * @param mainClass Class in which the actual type was found (for exception
-   *                  description)
+   * Update GiraphClasses with updated types
+   * @param classes Giraph classes
    */
-  private void verifyTypes(Class<?> expected, Class<?> actual,
-      String typeDesc, Class<?> mainClass) {
-    if (!expected.equals(actual)) {
-      if (actual.isAssignableFrom(expected)) {
-        LOG.warn("verifyTypes: proceeding with assignable types : " +
-          typeDesc + " types, in " + mainClass.getName() + " " + expected +
-          " expected, but " + actual + " found");
-      } else {
-        throw new IllegalStateException("verifyTypes: " + typeDesc +
-            " types " + "don't match, in " + mainClass.getName() + " " +
-            expected + " expected, but " + actual + " found");
-      }
-    }
+  public void updateGiraphClasses(GiraphClasses classes) {
+    classes.setComputationClass(computationClass);
+    classes.setIncomingMessageClasses(incomingMessageClasses);
+    classes.setOutgoingMessageClasses(outgoingMessageClasses);
   }
 
   @Override
   public void write(DataOutput output) throws IOException {
     WritableUtils.writeClass(computationClass, output);
-    WritableUtils.writeClass(messageCombinerClass, output);
-    WritableUtils.writeClass(incomingMessageClass, output);
-    WritableUtils.writeClass(outgoingMessageClass, output);
+    WritableUtils.writeWritableObject(incomingMessageClasses, output);
+    WritableUtils.writeWritableObject(outgoingMessageClasses, output);
   }
 
   @Override
   public void readFields(DataInput input) throws IOException {
     computationClass = WritableUtils.readClass(input);
-    messageCombinerClass = WritableUtils.readClass(input);
-    incomingMessageClass = WritableUtils.readClass(input);
-    outgoingMessageClass = WritableUtils.readClass(input);
+    incomingMessageClasses = WritableUtils.readWritableObject(input, conf);
+    outgoingMessageClasses = WritableUtils.readWritableObject(input, conf);
   }
 
   @Override
   public String toString() {
     String computationName = computationClass == null ? "_not_set_" :
         computationClass.getName();
-    String combinerName = (messageCombinerClass == null) ? "null" :
-        messageCombinerClass.getName();
-    String incomingName = (incomingMessageClass == null) ? "null" :
-      incomingMessageClass.getName();
-    String outgoingName = (outgoingMessageClass == null) ? "null" :
-      outgoingMessageClass.getName();
-
-    return "(computation=" + computationName + ",combiner=" + combinerName +
-        ",incoming=" + incomingName + ",outgoing=" + outgoingName + ")";
+    return "(computation=" + computationName +
+        ",incoming=" + incomingMessageClasses +
+        ",outgoing=" + outgoingMessageClasses + ")";
   }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java b/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java
new file mode 100644
index 0000000..0b9daf7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Type marking that in a particular superstep there will not be
+ * sent messages.
+ * We cannot use NullWritable for this, as you could send NullWritable,
+ * to send a signal (whether a vertex receives a message or not)
+ */
+public class NoMessage implements Writable {
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new IllegalStateException("NoMessage should never be read");
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new IllegalStateException("NoMessage should never be written");
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index daad860..1544984 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -18,14 +18,14 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
 /**
  * Stores vertex id and message pairs in a single byte array.
  *
@@ -37,7 +37,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
   M extends Writable> extends ByteArrayVertexIdData<I, M>
   implements VertexIdMessages<I, M> {
   /** Message value class */
-  private MessageValueFactory<M> messageValueFactory;
+  private final MessageValueFactory<M> messageValueFactory;
   /** Add the message size to the stream? (Depends on the message store) */
   private boolean useMessageSizeEncoding = false;
 
@@ -57,7 +57,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
    * de-serialized right away, so this won't help.
    */
   private void setUseMessageSizeEncoding() {
-    if (!getConf().useMessageCombiner()) {
+    if (!getConf().useOutgoingMessageCombiner()) {
       useMessageSizeEncoding = getConf().useMessageSizeEncoding();
     } else {
       useMessageSizeEncoding = false;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index f5f0fb9..028f9e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.utils;
 
+import java.lang.reflect.Modifier;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.jodah.typetools.TypeResolver;
 
@@ -111,4 +113,48 @@ public class ReflectionUtils {
     ConfigurationUtils.configureIfPossible(result, configuration);
     return result;
   }
+
+  /**
+   * Verify that found type matches the expected type. If types don't match an
+   * {@link IllegalStateException} will be thrown.
+   *
+   * @param concreteChild Concrete child type
+   * @param parent Parent type
+   * @param typeDesc String description of the type (for exception description)
+   * @param mainClass Class in which the actual type was found (for exception
+   *                  description)
+   */
+  public static void verifyTypes(Class<?> concreteChild, Class<?> parent,
+      String typeDesc, Class<?> mainClass) {
+    // unknown means object
+    if (parent == TypeResolver.Unknown.class) {
+      parent = Object.class;
+    }
+
+    verifyConcrete(concreteChild, typeDesc);
+
+    if (!parent.isAssignableFrom(concreteChild)) {
+      throw new IllegalStateException("verifyTypes: " + typeDesc + " types " +
+          "don't match, in " + mainClass.getName() + " " + concreteChild +
+          " expected, but " + parent + " found");
+    }
+  }
+
+  /**
+   * Verify that given type is a concrete type that can be instantiated.
+   *
+   * @param concrete type to check
+   * @param typeDesc String description of the type (for exception description)
+   */
+  public static void verifyConcrete(
+      Class<?> concrete, String typeDesc) {
+    if (concrete.isInterface()) {
+      throw new IllegalStateException("verifyTypes: " +
+          "Type " + typeDesc + " must be concrete class " + concrete);
+    }
+    if (Modifier.isAbstract(concrete.getModifiers())) {
+      throw new IllegalStateException("verifyTypes: " +
+          "Type " + typeDesc + "can't be abstract class" + concrete);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index f614a33..2f1c2ef 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -931,7 +931,8 @@ public class BspServiceWorker<I extends WritableComparable,
     waitForOtherWorkers(superstepFinishedNode);
 
     GlobalStats globalStats = new GlobalStats();
-    SuperstepClasses superstepClasses = new SuperstepClasses();
+    SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
+        getConfiguration());
     WritableUtils.readFieldsFromZnode(
         getZkExt(), superstepFinishedNode, false, null, globalStats,
         superstepClasses);
@@ -1659,7 +1660,8 @@ public class BspServiceWorker<I extends WritableComparable,
 
       // Load global stats and superstep classes
       GlobalStats globalStats = new GlobalStats();
-      SuperstepClasses superstepClasses = new SuperstepClasses();
+      SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
+          getConfiguration());
       String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
           CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
       DataInputStream finalizedStream =
@@ -1717,7 +1719,8 @@ else[HADOOP_NON_SECURE]*/
     Collections.shuffle(randomEntryList);
     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
-            getConfiguration(), this);
+            getConfiguration(), this,
+            false /* useOneMessageToManyIdsEncoding */);
     for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
       randomEntryList) {
       for (Integer partitionId : workerPartitionList.getValue()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 10b1a25..7b2fc0f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -18,6 +18,11 @@
 
 package org.apache.giraph.worker;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -43,11 +48,6 @@ import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
 import com.yammer.metrics.util.PercentGauge;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-
 /**
  * Abstract base class for loading vertex/edge input splits.
  * Every thread will has its own instance of WorkerClientRequestProcessor
@@ -103,7 +103,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
     this.context = context;
     this.workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E>(
-            context, configuration, bspServiceWorker);
+            context, configuration, bspServiceWorker,
+            false /* useOneMessageToManyIdsEncoding, not useful for input */);
     this.useLocality = configuration.useInputSplitLocality();
     this.splitsHandler = splitsHandler;
     this.configuration = configuration;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
index eb2497b..bf20580 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
@@ -18,21 +18,38 @@
 
 package org.apache.giraph.comm;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
-import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
 import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
 import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
+import org.apache.giraph.conf.DefaultMessageClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
 import org.apache.giraph.factories.TestMessageValueFactory;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.CollectionUtils;
@@ -46,23 +63,10 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
 
 /** Test for different types of message stores */
 public class TestMessageStores {
@@ -220,7 +224,12 @@ public class TestMessageStores {
     }
     out.close();
 
-    messageStore = messageStoreFactory.newStore(new TestMessageValueFactory<IntWritable>(IntWritable.class));
+    messageStore = messageStoreFactory.newStore(
+        new DefaultMessageClasses(
+            IntWritable.class,
+            DefaultMessageValueFactory.class,
+            null,
+            MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION));
 
     DataInputStream in = new DataInputStream(new BufferedInputStream(
         (new FileInputStream(file))));
@@ -240,7 +249,12 @@ public class TestMessageStores {
       TestData testData) throws IOException {
     SortedMap<IntWritable, Collection<IntWritable>> messages =
         new TreeMap<IntWritable, Collection<IntWritable>>();
-    S messageStore = messageStoreFactory.newStore(new TestMessageValueFactory<IntWritable>(IntWritable.class));
+    S messageStore = messageStoreFactory.newStore(
+        new DefaultMessageClasses(
+            IntWritable.class,
+            DefaultMessageValueFactory.class,
+            null,
+            MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION));
     putNTimes(messageStore, messages, testData);
     assertTrue(equalMessages(messageStore, messages, testData));
     messageStore.clearAll();

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java b/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
index 4e38bff..57529f4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
+++ b/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
@@ -18,6 +18,11 @@
 
 package org.apache.giraph.conf;
 
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
@@ -31,12 +36,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import static org.junit.Assert.assertEquals;
-
 /**
  * Benchmark tests to insure that object creation via
  * {@link ImmutableClassesGiraphConfiguration} is fast
@@ -49,7 +48,7 @@ public class TestObjectCreation {
   private long startNanos = -1;
   private long totalNanos = -1;
   private long total = 0;
-  private long expected = COUNT * (COUNT - 1) / 2L;
+  private final long expected = COUNT * (COUNT - 1) / 2L;
   private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
       LongWritable> configuration;
 
@@ -59,7 +58,6 @@ public class TestObjectCreation {
     GiraphConstants.VERTEX_ID_CLASS.set(conf, IntWritable.class);
     GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
     GiraphConstants.EDGE_VALUE_CLASS.set(conf, DoubleWritable.class);
-    GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS.set(conf, LongWritable.class);
     GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.set(conf, LongWritable.class);
     conf.setComputationClass(LongNoOpComputation.class);
     configuration =

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
index 721a74c..272666c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
@@ -18,10 +18,15 @@
 
 package org.apache.giraph.io;
 
-import com.google.common.collect.Maps;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+
 import org.apache.giraph.BspCase;
 import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.factories.VertexValueFactory;
@@ -39,10 +44,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.Map;
-
-import static org.junit.Assert.*;
+import com.google.common.collect.Maps;
 
 /**
  * A test case to ensure that loading a graph from vertices and edges works as
@@ -349,14 +351,6 @@ public class TestVertexEdgeInput extends BspCase {
   public static class TestVertexValueFactory
       implements VertexValueFactory<IntWritable> {
     @Override
-    public void initialize(ImmutableClassesGiraphConfiguration conf) { }
-
-    @Override
-    public Class<IntWritable> getValueClass() {
-      return IntWritable.class;
-    }
-
-    @Override
     public IntWritable newInstance() {
       return new IntWritable(3);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
index 4a8caaa..194bb5e 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.master;
 
+import java.io.IOException;
+
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -32,74 +34,77 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.junit.Test;
 
-import java.io.IOException;
-
 /** Test type verification when switching computation and combiner types */
 public class TestComputationCombinerTypes {
+  private void testConsecutiveComp(
+      Class<? extends Computation> firstComputationClass,
+      Class<? extends Computation> secondComputationClass) {
+    testConsecutiveComp(firstComputationClass, secondComputationClass, null);
+  }
+
+  private void testConsecutiveComp(
+      Class<? extends Computation> firstComputationClass,
+      Class<? extends Computation> secondComputationClass,
+      Class<? extends MessageCombiner> messageCombinerClass) {
+    ImmutableClassesGiraphConfiguration conf =
+        createConfiguration(firstComputationClass);
+    SuperstepClasses classes = SuperstepClasses.createAndExtractTypes(conf);
+    classes.setComputationClass(secondComputationClass);
+    classes.setMessageCombinerClass(messageCombinerClass);
+    classes.verifyTypesMatch(true);
+  }
+
   @Test
   public void testAllMatchWithoutCombiner() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntNoOpComputation.class, null);
-    classes.verifyTypesMatch(createConfiguration(IntNoOpComputation.class), true);
+    testConsecutiveComp(IntNoOpComputation.class, IntNoOpComputation.class);
   }
 
   @Test
   public void testAllMatchWithCombiner() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            IntDoubleMessageCombiner.class);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class,
+        IntIntIntLongDoubleComputation.class,
+        IntDoubleMessageCombiner.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentIdTypes() {
-    SuperstepClasses classes =
-        new SuperstepClasses(LongIntIntLongIntComputation.class, null);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class, LongIntIntLongIntComputation.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentVertexValueTypes() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntLongIntLongIntComputation.class, null);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class, IntLongIntLongIntComputation.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentEdgeDataTypes() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntIntLongLongIntComputation.class, null);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class, IntIntLongLongIntComputation.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentMessageTypes() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntIntIntIntLongComputation.class, null);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntLongDoubleComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntLongDoubleComputation.class, IntIntIntIntLongComputation.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentCombinerIdType() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            DoubleDoubleMessageCombiner.class);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class,
+        IntIntIntLongDoubleComputation.class,
+        DoubleDoubleMessageCombiner.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentCombinerMessageType() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            IntLongMessageCombiner.class);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class,
+        IntIntIntLongDoubleComputation.class,
+        IntLongMessageCombiner.class);
   }
 
   private static ImmutableClassesGiraphConfiguration createConfiguration(

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
index 833061e..8a034c2 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -140,9 +140,6 @@ public class TestSwitchClasses {
           break;
         case 4:
           setComputation(Computation1.class);
-          // message types removed
-          setIncomingMessage(null);
-          setOutgoingMessage(null);
           break;
         default:
           haltComputation();


Mime
View raw message