incubator-giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clau...@apache.org
Subject svn commit: r1234997 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/ src/test/java/org/apache/giraph/examples/
Date Mon, 23 Jan 2012 21:33:04 GMT
Author: claudio
Date: Mon Jan 23 21:33:04 2012
New Revision: 1234997

URL: http://svn.apache.org/viewvc?rev=1234997&view=rev
Log:
GIRAPH-124: Combiner should return Iterable<M> instead of M or null.

Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Jan 23 21:33:04 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.1.0 - unreleased
   
+  GIRAPH-124: Combiner should return Iterable<M> instead of M or 
+  null. (claudio)
+
   GIRAPH-125: Bug in LongDoubleFloatDoubleVertex.sendMsgToAllEdges().
   (humming80 via aching)
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
Mon Jan 23 21:33:04 2012
@@ -59,6 +59,8 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.utils.MemoryUtils;
 
+import com.google.common.collect.Iterables;
+
 /*if[HADOOP_FACEBOOK]
 import org.apache.hadoop.ipc.ProtocolSignature;
 end[HADOOP_FACEBOOK]*/
@@ -235,10 +237,23 @@ public abstract class BasicRPCCommunicat
                             }
                         }
                         if (combiner != null && entry.getValue().size() > 1) {
-                            M combinedMsg = combiner.combine(entry.getKey(),
-                                                             entry.getValue());
+                            Iterable<M> messages = combiner.combine(
+                                    entry.getKey(), entry.getValue());
+                            if (messages == null) {
+                                throw new IllegalStateException(
+                                        "run: Combiner cannot return null");
+                            }
+                            if (Iterables.size(entry.getValue()) < 
+                                    Iterables.size(messages)) {
+                                throw new IllegalStateException(
+                                        "run: The number of combined " +
+                                        "messages is required to be <= to " + 
+                                        "number of messages to be combined");
+                            }
                             entry.getValue().clear();
-                            entry.getValue().add(combinedMsg);
+                            for (M msg: messages) {
+                                entry.getValue().add(msg);
+                            }
                         }
                         if (entry.getValue().isEmpty()) {
                             throw new IllegalStateException(
@@ -347,10 +362,21 @@ public abstract class BasicRPCCommunicat
                     peerConnection.getRPCProxy();
 
                 if (combiner != null) {
-                    M combinedMsg = combiner.combine(destVertex,
-                                                     outMessageList);
-                    if (combinedMsg != null) {
-                        proxy.putMsg(destVertex, combinedMsg);
+                    Iterable<M> messages = combiner.combine(destVertex,
+                                                            outMessageList);
+                    if (messages == null) {
+                        throw new IllegalStateException(
+                                "run: Combiner cannot return null");
+                    }
+                    if (Iterables.size(outMessageList) < 
+                            Iterables.size(messages)) {
+                        throw new IllegalStateException(
+                                "run: The number of combined messages is " +
+                                "required to be <= to the number of " +
+                                "messages to be combined");
+                    }
+                    for (M msg: messages) {
+                        proxy.putMsg(destVertex, msg);
                     }
                 } else {
                     proxy.putMsgList(destVertex, outMessageList);
@@ -971,10 +997,24 @@ end[HADOOP_FACEBOOK]*/
             for (Entry<I, List<M>> entry : transientInMessages.entrySet()) {
                 if (combiner != null) {
                     try {
-                        M combinedMsg = combiner.combine(entry.getKey(),
-                                                         entry.getValue());
-                        if (combinedMsg != null) {
-                            putMsg(entry.getKey(), combinedMsg);
+                        Iterable<M> messages = 
+                            combiner.combine(entry.getKey(), 
+                                             entry.getValue());
+                        if (messages == null) {
+                            throw new IllegalStateException(
+                                    "prepareSuperstep: Combiner cannot " +
+                                    "return null");
+                        }
+                        if (Iterables.size(entry.getValue()) < 
+                                Iterables.size(messages)) {
+                            throw new IllegalStateException(
+                                    "prepareSuperstep: The number of " +
+                                    "combined messages is " +
+                                    "required to be <= to the number of " +
+                                    "messages to be combined");
+                        }
+                        for (M msg: messages) {
+                            putMsg(entry.getKey(), msg);
                         }
                     } catch (IOException e) {
                         // no actual IO -- should never happen

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
Mon Jan 23 21:33:04 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.graph.VertexCom
 import org.apache.hadoop.io.IntWritable;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -31,7 +32,7 @@ public class MinimumIntCombiner
         extends VertexCombiner<IntWritable, IntWritable> {
 
     @Override
-    public IntWritable combine(IntWritable target,
+    public Iterable<IntWritable> combine(IntWritable target,
     		Iterable<IntWritable> messages) throws IOException {
         int minimum = Integer.MAX_VALUE;
         for (IntWritable message : messages) {
@@ -39,6 +40,9 @@ public class MinimumIntCombiner
                 minimum = message.get();
             }
         }
-        return new IntWritable(minimum);
+        List<IntWritable> value = new ArrayList<IntWritable>();
+        value.add(new IntWritable(minimum));
+        
+        return value;
     }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
Mon Jan 23 21:33:04 2012
@@ -19,6 +19,7 @@
 package org.apache.giraph.examples;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.io.IntWritable;
@@ -33,13 +34,15 @@ public class SimpleSumCombiner
         extends VertexCombiner<LongWritable, IntWritable> {
 
     @Override
-    public IntWritable combine(LongWritable vertexIndex,
-                               Iterable<IntWritable> messages)
-            throws IOException {
+    public Iterable<IntWritable> combine(LongWritable vertexIndex,
+            Iterable<IntWritable> messages) throws IOException {
         int sum = 0;
         for (IntWritable msg : messages) {
             sum += msg.get();
         }
-        return new IntWritable(sum);
+        List<IntWritable> value = new ArrayList<IntWritable>();
+        value.add(new IntWritable(sum));
+        
+        return value;
     }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java Mon Jan
23 21:33:04 2012
@@ -33,15 +33,16 @@ import org.apache.hadoop.io.WritableComp
 public abstract class VertexCombiner<I extends WritableComparable,
                                      M extends Writable> {
 
-  /**
-   * Combines message values for a particular vertex index.
-   *
-   * @param vertexIndex Index of the vertex getting these messages
-   * @param messages Iterable of the messages to be combined
-   * @return Message that is combined from {@link messages} or null if no
-   *         message it to be sent
-   * @throws IOException
-   */
-   public abstract M combine(I vertexIndex,
-                             Iterable<M> messages) throws IOException;
+   /**
+    * Combines message values for a particular vertex index.
+    *
+    * @param vertexIndex Index of the vertex getting these messages
+    * @param messages Iterable of the messages to be combined
+    * @return Iterable of the combined messages. The returned value cannot 
+    *         be null and its size is required to be smaller or equal to 
+    *         the size of {@link messages}.
+    * @throws IOException
+    */
+    public abstract Iterable<M> combine(I vertexIndex,
+            Iterable<M> messages) throws IOException;
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java Mon Jan 23
21:33:04 2012
@@ -35,11 +35,13 @@ import org.apache.giraph.graph.GraphMapp
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.lib.JsonBase64VertexInputFormat;
 import org.apache.giraph.lib.JsonBase64VertexOutputFormat;
+import org.apache.giraph.utils.EmptyIterable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 
 
 public class TestVertexTypes
@@ -82,10 +84,9 @@ public class TestVertexTypes
             VertexCombiner<LongWritable, FloatWritable> {
 
         @Override
-        public FloatWritable combine(LongWritable vertexIndex,
-                                  Iterable<FloatWritable> msgList)
-                throws IOException {
-            return null;
+        public Iterable<FloatWritable> combine(LongWritable vertexIndex,
+                Iterable<FloatWritable> msgList) throws IOException {
+            return new EmptyIterable<FloatWritable>();
         }
     }
 
@@ -96,10 +97,10 @@ public class TestVertexTypes
             VertexCombiner<LongWritable, DoubleWritable> {
 
         @Override
-        public DoubleWritable combine(LongWritable vertexIndex,
-                                      Iterable<DoubleWritable> msgList)
+        public Iterable<DoubleWritable> combine(LongWritable vertexIndex,
+                Iterable<DoubleWritable> msgList)
                 throws IOException {
-            return null;
+            return new EmptyIterable<DoubleWritable>();
         }
     }
 

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
(original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
Mon Jan 23 21:33:04 2012
@@ -22,6 +22,9 @@ import junit.framework.TestCase;
 import org.apache.giraph.graph.VertexCombiner;
 import org.apache.hadoop.io.IntWritable;
 
+import com.google.common.collect.Iterables;
+
+import java.io.IOException;
 import java.util.Arrays;
 
 public class MinimumIntCombinerTest extends TestCase {
@@ -31,10 +34,11 @@ public class MinimumIntCombinerTest exte
         VertexCombiner<IntWritable, IntWritable> combiner =
                 new MinimumIntCombiner();
 
-        IntWritable result = combiner.combine(new IntWritable(1), Arrays.asList(
+        Iterable<IntWritable> result = combiner.combine(
+                new IntWritable(1), Arrays.asList(
                 new IntWritable(39947466), new IntWritable(199),
                 new IntWritable(19998888), new IntWritable(42)));
-
-        assertEquals(42, result.get());
+        assertTrue(result.iterator().hasNext());
+        assertEquals(42, result.iterator().next().get());
     }
 }



Mime
View raw message