giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 9a232d1
Date Fri, 14 Feb 2014 22:17:27 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 94033dbbd -> 9a232d185


GIRAPH-848: Allowing plain computation with types being configurable (ikabiljo via majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9a232d18
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9a232d18
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9a232d18

Branch: refs/heads/trunk
Commit: 9a232d185eb97a523359e2e2e73a869a4af15a85
Parents: 94033db
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Fri Feb 14 14:15:53 2014 -0800
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Fri Feb 14 14:15:53 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../ImmutableClassesGiraphConfiguration.java    |  20 ++--
 .../org/apache/giraph/master/MasterCompute.java |  20 +++-
 .../apache/giraph/master/SuperstepClasses.java  | 101 +++++++++++++++----
 .../apache/giraph/master/TestSwitchClasses.java |  31 +++---
 5 files changed, 131 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/9a232d18/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 32928f9..5ec1cc6 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-848: Allowing plain computation with types being configurable (ikabiljo via majakabiljo)
+
   GIRAPH-854: fix for test fail due to GIRAPH-840 (pavanka via majakabiljo)
 
   GIRAPH-853: Fix concurrency issue in GiraphMetrics (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/9a232d18/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index b33938a..2e8c935 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -34,8 +34,8 @@ import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.DefaultVertex;
 import org.apache.giraph.graph.Language;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -76,8 +76,6 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.util.Progressable;
 
-import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
-
 /**
  * The classes set here are immutable, the remaining configuration is mutable.
  * Classes are immutable and final to provide the best performance for
@@ -1021,16 +1019,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     Class<? extends Computation> computationClass =
         superstepClasses.getComputationClass();
     classes.setComputationClass(computationClass);
-    if (computationClass != null) {
-      Class<?>[] classList =
-          getTypeArguments(TypesHolder.class, computationClass);
-
-      Class<? extends Writable> incomingMsgValueClass =
-          (Class<? extends Writable>) classList[3];
+    Class<? extends Writable> incomingMsgValueClass =
+        superstepClasses.getIncomingMessageClass();
+    if (incomingMsgValueClass != null) {
       classes.setIncomingMessageValueClass(incomingMsgValueClass);
-
-      Class<? extends Writable> outgoingMsgValueClass =
-          (Class<? extends Writable>) classList[4];
+    }
+    Class<? extends Writable> outgoingMsgValueClass =
+        superstepClasses.getOutgoingMessageClass();
+    if (outgoingMsgValueClass != null) {
       classes.setOutgoingMessageValueClass(outgoingMsgValueClass);
     }
     classes.setMessageCombiner(superstepClasses.getMessageCombinerClass());

http://git-wip-us.apache.org/repos/asf/giraph/blob/9a232d18/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 287fdb9..d77a9b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -18,9 +18,9 @@
 
 package org.apache.giraph.master;
 
+import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
 import org.apache.hadoop.io.Writable;
@@ -169,6 +169,24 @@ public abstract class MasterCompute
     return superstepClasses.getMessageCombinerClass();
   }
 
+  /**
+   * Set incoming message class to be used
+   * @param incomingMessageClass incoming message class
+   */
+  public final void setIncomingMessage(
+      Class<? extends Writable> incomingMessageClass) {
+    superstepClasses.setIncomingMessageClass(incomingMessageClass);
+  }
+
+  /**
+   * Set outgoing message class to be used
+   * @param outgoingMessageClass outgoing message class
+   */
+  public final void setOutgoingMessage(
+      Class<? extends Writable> outgoingMessageClass) {
+    superstepClasses.setOutgoingMessageClass(outgoingMessageClass);
+  }
+
   @Override
   public final <A extends Writable> boolean registerAggregator(
     String name, Class<? extends Aggregator<A>> aggregatorClass)

http://git-wip-us.apache.org/repos/asf/giraph/blob/9a232d18/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
index 8344910..8145109 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -18,6 +18,13 @@
 
 package org.apache.giraph.master;
 
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Modifier;
+
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.TypesHolder;
@@ -26,22 +33,23 @@ import org.apache.giraph.graph.Language;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Modifier;
-
-import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
+import org.apache.log4j.Logger;
 
 /**
  * Holds Computation and MessageCombiner class.
  */
 public class SuperstepClasses implements Writable {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(SuperstepClasses.class);
+
   /** Computation class to be used in the following superstep */
   private Class<? extends Computation> computationClass;
   /** MessageCombiner class to be used in the following superstep */
   private Class<? extends MessageCombiner> messageCombinerClass;
+  /** Incoming message class to be used in the following superstep */
+  private Class<? extends Writable> incomingMessageClass;
+  /** Outgoing message class to be used in the following superstep */
+  private Class<? extends Writable> outgoingMessageClass;
 
   /**
    * Default constructor
@@ -80,6 +88,38 @@ public class SuperstepClasses implements Writable {
     return messageCombinerClass;
   }
 
+  /**
+   * Get incoming message class, either set directly, or through Computation
+   * @return incoming message class
+   */
+  public Class<? extends Writable> getIncomingMessageClass() {
+    if (incomingMessageClass != null) {
+      return incomingMessageClass;
+    }
+    if (computationClass == null) {
+      return null;
+    }
+    Class[] computationTypes = ReflectionUtils.getTypeArguments(
+        TypesHolder.class, computationClass);
+    return computationTypes[3];
+  }
+
+  /**
+   * Get outgoing message class, either set directly, or through Computation
+   * @return outgoing message class
+   */
+  public Class<? extends Writable> getOutgoingMessageClass() {
+    if (outgoingMessageClass != null) {
+      return outgoingMessageClass;
+    }
+    if (computationClass == null) {
+      return null;
+    }
+    Class[] computationTypes = ReflectionUtils.getTypeArguments(
+        TypesHolder.class, computationClass);
+    return computationTypes[4];
+  }
+
   public void setComputationClass(
       Class<? extends Computation> computationClass) {
     this.computationClass = computationClass;
@@ -87,8 +127,17 @@ public class SuperstepClasses implements Writable {
 
   public void setMessageCombinerClass(
       Class<? extends MessageCombiner> messageCombinerClass) {
-    this.messageCombinerClass =
-        messageCombinerClass;
+    this.messageCombinerClass = messageCombinerClass;
+  }
+
+  public void setIncomingMessageClass(
+      Class<? extends Writable> incomingMessageClass) {
+    this.incomingMessageClass = incomingMessageClass;
+  }
+
+  public void setOutgoingMessageClass(
+      Class<? extends Writable> outgoingMessageClass) {
+    this.outgoingMessageClass = outgoingMessageClass;
   }
 
   /**
@@ -118,11 +167,13 @@ public class SuperstepClasses implements Writable {
     verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
         "Edge value", computationClass);
 
+    Class<?> incomingMessageType = getIncomingMessageClass();
+    Class<?> outgoingMessageType = getOutgoingMessageClass();
+
     if (checkMatchingMesssageTypes) {
-      verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes[3],
-          "Previous outgoing and new incoming message", computationClass);
+      verifyTypes(incomingMessageType, conf.getOutgoingMessageValueClass(),
+          "New incoming and previous outgoing message", computationClass);
     }
-    Class<?> outgoingMessageType = computationTypes[4];
     if (outgoingMessageType.isInterface()) {
       throw new IllegalStateException("verifyTypesMatch: " +
           "Message type must be concrete class " + outgoingMessageType);
@@ -154,9 +205,15 @@ public class SuperstepClasses implements Writable {
   private void verifyTypes(Class<?> expected, Class<?> actual,
       String typeDesc, Class<?> mainClass) {
     if (!expected.equals(actual)) {
-      throw new IllegalStateException("verifyTypes: " + typeDesc + " types " +
-          "don't match, in " + mainClass.getName() + " " + expected +
+      if (actual.isAssignableFrom(expected)) {
+        LOG.warn("verifyTypes: proceeding with assignable types : " +
+          typeDesc + " types, in " + mainClass.getName() + " " + expected +
           " expected, but " + actual + " found");
+      } else {
+        throw new IllegalStateException("verifyTypes: " + typeDesc +
+            " types " + "don't match, in " + mainClass.getName() + " " +
+            expected + " expected, but " + actual + " found");
+      }
     }
   }
 
@@ -164,20 +221,30 @@ public class SuperstepClasses implements Writable {
   public void write(DataOutput output) throws IOException {
     WritableUtils.writeClass(computationClass, output);
     WritableUtils.writeClass(messageCombinerClass, output);
+    WritableUtils.writeClass(incomingMessageClass, output);
+    WritableUtils.writeClass(outgoingMessageClass, output);
   }
 
   @Override
   public void readFields(DataInput input) throws IOException {
     computationClass = WritableUtils.readClass(input);
     messageCombinerClass = WritableUtils.readClass(input);
+    incomingMessageClass = WritableUtils.readClass(input);
+    outgoingMessageClass = WritableUtils.readClass(input);
   }
 
   @Override
   public String toString() {
     String computationName = computationClass == null ? "_not_set_" :
         computationClass.getName();
-    return "(computation=" + computationName + ",combiner=" +
-        ((messageCombinerClass == null) ? "null" :
-            messageCombinerClass.getName()) + ")";
+    String combinerName = (messageCombinerClass == null) ? "null" :
+        messageCombinerClass.getName();
+    String incomingName = (incomingMessageClass == null) ? "null" :
+      incomingMessageClass.getName();
+    String outgoingName = (outgoingMessageClass == null) ? "null" :
+      outgoingMessageClass.getName();
+
+    return "(computation=" + computationName + ",combiner=" + combinerName +
+        ",incoming=" + incomingName + ",outgoing=" + outgoingName + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/9a232d18/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
index 29335af..833061e 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -18,6 +18,14 @@
 
 package org.apache.giraph.master;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+import junit.framework.Assert;
+
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.graph.AbstractComputation;
@@ -32,14 +40,6 @@ import org.junit.Test;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-import junit.framework.Assert;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-
 /** Test switching Computation and MessageCombiner class during application */
 public class TestSwitchClasses {
   @Test
@@ -135,9 +135,14 @@ public class TestSwitchClasses {
         case 3:
           setComputation(Computation3.class);
           setMessageCombiner(SumMessageCombiner.class);
+          setIncomingMessage(DoubleWritable.class);
+          setOutgoingMessage(IntWritable.class);
           break;
         case 4:
           setComputation(Computation1.class);
+          // message types removed
+          setIncomingMessage(null);
+          setOutgoingMessage(null);
           break;
         default:
           haltComputation();
@@ -178,11 +183,11 @@ public class TestSwitchClasses {
   }
 
   public static class Computation3 extends AbstractComputation<IntWritable,
-        StatusValue, IntWritable, DoubleWritable, IntWritable> {
+        StatusValue, IntWritable, Writable, Writable> {
     @Override
     public void compute(
         Vertex<IntWritable, StatusValue, IntWritable> vertex,
-        Iterable<DoubleWritable> messages) throws IOException {
+        Iterable<Writable> messages) throws IOException {
       vertex.getValue().computations.add(3);
       vertex.getValue().addDoubleMessages(messages);
 
@@ -238,10 +243,10 @@ public class TestSwitchClasses {
       messagesReceived.add(messagesList);
     }
 
-    public void addDoubleMessages(Iterable<DoubleWritable> messages) {
+    public void addDoubleMessages(Iterable<Writable> messages) {
       HashSet<Double> messagesList = new HashSet<Double>();
-      for (DoubleWritable message : messages) {
-        messagesList.add(message.get());
+      for (Writable message : messages) {
+        messagesList.add(((DoubleWritable)message).get());
       }
       messagesReceived.add(messagesList);
     }


Mime
View raw message