giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1365352 [2/4] - in /giraph/trunk: ./ giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/ giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/ giraph-formats-contrib/src/test/java/org/apache...
Date Tue, 24 Jul 2012 23:37:45 GMT
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java Tue Jul 24 23:37:42 2012
@@ -18,11 +18,9 @@
 
 package org.apache.giraph.examples;
 
+import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.giraph.graph.EdgeListVertex;
-
-import java.util.Iterator;
 
 /**
  * User applications can subclass IdentityVertex, which
@@ -41,7 +39,7 @@ public abstract class IdentityVertex<I e
   extends EdgeListVertex<I, V, E, M> {
 
   @Override
-  public void compute(Iterator<M> msgIterator) {
+  public void compute(Iterable<M> messages) {
     voteToHalt();
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java Tue Jul 24 23:37:42 2012
@@ -18,10 +18,8 @@
 
 package org.apache.giraph.examples;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.lib.TextVertexInputFormat;
 import org.apache.hadoop.io.IntWritable;
@@ -32,6 +30,9 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import java.io.IOException;
 import java.util.Map;
 import java.util.regex.Pattern;
@@ -74,9 +75,9 @@ public class IntIntNullIntTextInputForma
     }
 
     @Override
-    public BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+    public Vertex<IntWritable, IntWritable, NullWritable, IntWritable>
     getCurrentVertex() throws IOException, InterruptedException {
-      BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+      Vertex<IntWritable, IntWritable, NullWritable, IntWritable>
       vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
       IntWritable>createVertex(getContext().getConfiguration());
 

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java?rev=1365352&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java Tue Jul 24 23:37:42 2012
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * {@link VertexCombiner} that finds the minimum {@link DoubleWritable}
+ */
+public class MinimumDoubleCombiner extends
+    VertexCombiner<LongWritable, DoubleWritable> {
+  @Override
+  public Iterable<DoubleWritable> combine(
+      LongWritable target,
+      Iterable<DoubleWritable> messages) throws IOException {
+    double minimum = Double.MAX_VALUE;
+    for (DoubleWritable message : messages) {
+      if (message.get() < minimum) {
+        minimum = message.get();
+      }
+    }
+    List<DoubleWritable> value = new ArrayList<DoubleWritable>();
+    value.add(new DoubleWritable(minimum));
+
+    return value;
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Tue Jul 24 23:37:42 2012
@@ -24,6 +24,7 @@ import org.apache.commons.cli.HelpFormat
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.WorkerContext;
@@ -37,8 +38,6 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
-import java.util.Iterator;
-
 /**
  * An example that simply uses its id, value, and edges to compute new data
  * every iteration to verify that checkpoint restarting works.  Fault injection
@@ -64,7 +63,7 @@ public class SimpleCheckpointVertex exte
   private Configuration conf;
 
   @Override
-  public void compute(Iterator<FloatWritable> msgIterator) {
+  public void compute(Iterable<FloatWritable> messages) {
     SimpleCheckpointVertexWorkerContext workerContext =
         (SimpleCheckpointVertexWorkerContext) getWorkerContext();
 
@@ -76,7 +75,7 @@ public class SimpleCheckpointVertex exte
 
     if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
         (getContext().getTaskAttemptID().getId() == 0) &&
-        (getVertexId().get() == FAULTING_VERTEX_ID)) {
+        (getId().get() == FAULTING_VERTEX_ID)) {
       LOG.info("compute: Forced a fault on the first " +
           "attempt of superstep " +
           FAULTING_SUPERSTEP + " and vertex id " +
@@ -88,37 +87,34 @@ public class SimpleCheckpointVertex exte
       return;
     }
     LOG.info("compute: " + sumAggregator);
-    sumAggregator.aggregate(getVertexId().get());
+    sumAggregator.aggregate(getId().get());
     LOG.info("compute: sum = " +
         sumAggregator.getAggregatedValue().get() +
-        " for vertex " + getVertexId());
+        " for vertex " + getId());
     float msgValue = 0.0f;
-    while (msgIterator.hasNext()) {
-      float curMsgValue = msgIterator.next().get();
+    for (FloatWritable message : messages) {
+      float curMsgValue = message.get();
       msgValue += curMsgValue;
       LOG.info("compute: got msgValue = " + curMsgValue +
-          " for vertex " + getVertexId() +
+          " for vertex " + getId() +
           " on superstep " + getSuperstep());
     }
-    int vertexValue = getVertexValue().get();
-    setVertexValue(new IntWritable(vertexValue + (int) msgValue));
-    LOG.info("compute: vertex " + getVertexId() +
-        " has value " + getVertexValue() +
+    int vertexValue = getValue().get();
+    setValue(new IntWritable(vertexValue + (int) msgValue));
+    LOG.info("compute: vertex " + getId() +
+        " has value " + getValue() +
         " on superstep " + getSuperstep());
-    for (Iterator<LongWritable> edges = getOutEdgesIterator();
-         edges.hasNext();) {
-      LongWritable targetVertexId = edges.next();
-      FloatWritable edgeValue = getEdgeValue(targetVertexId);
-      LOG.info("compute: vertex " + getVertexId() +
-          " sending edgeValue " + edgeValue +
+    for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+      FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
+          (float) vertexValue);
+      LOG.info("compute: vertex " + getId() +
+          " sending edgeValue " + edge.getValue() +
           " vertexValue " + vertexValue +
-          " total " + (edgeValue.get() +
-              (float) vertexValue) +
-              " to vertex " + targetVertexId +
+          " total " + newEdgeValue +
+              " to vertex " + edge.getTargetVertexId() +
               " on superstep " + getSuperstep());
-      edgeValue.set(edgeValue.get() + (float) vertexValue);
-      addEdge(targetVertexId, edgeValue);
-      sendMsg(targetVertexId, new FloatWritable(edgeValue.get()));
+      addEdge(edge.getTargetVertexId(), newEdgeValue);
+      sendMessage(edge.getTargetVertexId(), newEdgeValue);
     }
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java Tue Jul 24 23:37:42 2012
@@ -18,15 +18,12 @@
 
 package org.apache.giraph.examples;
 
-import java.util.Iterator;
-
+import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.log4j.Logger;
 
-import org.apache.giraph.graph.EdgeListVertex;
-
 /**
  * Test whether messages can go through a combiner.
  */
@@ -36,20 +33,20 @@ public class SimpleCombinerVertex extend
   private static Logger LOG = Logger.getLogger(SimpleCombinerVertex.class);
 
   @Override
-  public void compute(Iterator<IntWritable> msgIterator) {
-    if (getVertexId().equals(new LongWritable(2))) {
-      sendMsg(new LongWritable(1), new IntWritable(101));
-      sendMsg(new LongWritable(1), new IntWritable(102));
-      sendMsg(new LongWritable(1), new IntWritable(103));
+  public void compute(Iterable<IntWritable> messages) {
+    if (getId().equals(new LongWritable(2))) {
+      sendMessage(new LongWritable(1), new IntWritable(101));
+      sendMessage(new LongWritable(1), new IntWritable(102));
+      sendMessage(new LongWritable(1), new IntWritable(103));
     }
-    if (!getVertexId().equals(new LongWritable(1))) {
+    if (!getId().equals(new LongWritable(1))) {
       voteToHalt();
     } else {
       // Check the messages
       int sum = 0;
       int num = 0;
-      while (msgIterator != null && msgIterator.hasNext()) {
-        sum += msgIterator.next().get();
+      for (IntWritable message : messages) {
+        sum += message.get();
         num++;
       }
       LOG.info("TestCombinerVertex: Received a sum of " + sum +

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java Tue Jul 24 23:37:42 2012
@@ -24,8 +24,6 @@ import org.apache.hadoop.io.FloatWritabl
 import org.apache.hadoop.io.LongWritable;
 import org.apache.log4j.Logger;
 
-import java.util.Iterator;
-
 /**
  * Vertex to allow unit testing of failure detection
  */
@@ -37,18 +35,18 @@ public class SimpleFailVertex extends Ed
   private static long SUPERSTEP = 0;
 
   @Override
-  public void compute(Iterator<DoubleWritable> msgIterator) {
+  public void compute(Iterable<DoubleWritable> messages) {
     if (getSuperstep() >= 1) {
       double sum = 0;
-      while (msgIterator.hasNext()) {
-        sum += msgIterator.next().get();
+      for (DoubleWritable message : messages) {
+        sum += message.get();
       }
       DoubleWritable vertexValue =
-          new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
-      setVertexValue(vertexValue);
+          new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
+      setValue(vertexValue);
       if (getSuperstep() < 30) {
         if (getSuperstep() == 20) {
-          if (getVertexId().get() == 10L) {
+          if (getId().get() == 10L) {
             try {
               Thread.sleep(2000);
             } catch (InterruptedException e) {
@@ -59,9 +57,9 @@ public class SimpleFailVertex extends Ed
             return;
           }
         }
-        long edges = getNumOutEdges();
-        sendMsgToAllEdges(
-            new DoubleWritable(getVertexValue().get() / edges));
+        long edges = getNumEdges();
+        sendMessageToAllEdges(
+            new DoubleWritable(getValue().get() / edges));
       } else {
         voteToHalt();
       }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java Tue Jul 24 23:37:42 2012
@@ -27,6 +27,6 @@ import org.apache.hadoop.io.LongWritable
  * lib.LongDoubleDoubleAdjacencyListVertexInputFormat
  */
 
-public class SimpleLongDoubleDoubleDoubleIdentityVertex extends
+public abstract class SimpleLongDoubleDoubleDoubleIdentityVertex extends
   IdentityVertex<LongWritable, DoubleWritable,
   DoubleWritable, DoubleWritable> { }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java Tue Jul 24 23:37:42 2012
@@ -28,7 +28,6 @@ import org.apache.log4j.Logger;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Iterator;
 
 /**
  * Demonstrates a computation with a centralized part implemented via a
@@ -42,13 +41,13 @@ public class SimpleMasterComputeVertex e
       Logger.getLogger(SimpleMasterComputeVertex.class);
 
   @Override
-  public void compute(Iterator<DoubleWritable> msgIterator) {
+  public void compute(Iterable<DoubleWritable> messages) {
     DoubleOverwriteAggregator agg =
         (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
-    double oldSum = getSuperstep() == 0 ? 0 : getVertexValue().get();
+    double oldSum = getSuperstep() == 0 ? 0 : getValue().get();
     double newValue = agg.getAggregatedValue().get();
     double newSum = oldSum + newValue;
-    setVertexValue(new DoubleWritable(newSum));
+    setValue(new DoubleWritable(newSum));
     SimpleMasterComputeWorkerContext workerContext =
         (SimpleMasterComputeWorkerContext) getWorkerContext();
     workerContext.setFinalSum(newSum);

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java Tue Jul 24 23:37:42 2012
@@ -18,15 +18,12 @@
 
 package org.apache.giraph.examples;
 
-import java.util.Iterator;
-
+import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.log4j.Logger;
 
-import org.apache.giraph.graph.EdgeListVertex;
-
 /**
  * Test whether messages can be sent and received by vertices.
  */
@@ -35,19 +32,19 @@ public class SimpleMsgVertex extends
   /** Class logger */
   private static Logger LOG = Logger.getLogger(SimpleMsgVertex.class);
   @Override
-  public void compute(Iterator<IntWritable> msgIterator) {
-    if (getVertexId().equals(new LongWritable(2))) {
-      sendMsg(new LongWritable(1), new IntWritable(101));
-      sendMsg(new LongWritable(1), new IntWritable(102));
-      sendMsg(new LongWritable(1), new IntWritable(103));
+  public void compute(Iterable<IntWritable> messages) {
+    if (getId().equals(new LongWritable(2))) {
+      sendMessage(new LongWritable(1), new IntWritable(101));
+      sendMessage(new LongWritable(1), new IntWritable(102));
+      sendMessage(new LongWritable(1), new IntWritable(103));
     }
-    if (!getVertexId().equals(new LongWritable(1))) {
+    if (!getId().equals(new LongWritable(1))) {
       voteToHalt();
     } else {
       /* Check the messages */
       int sum = 0;
-      while (msgIterator != null && msgIterator.hasNext()) {
-        sum += msgIterator.next().get();
+      for (IntWritable message : messages) {
+        sum += message.get();
       }
       LOG.info("TestMsgVertex: Received a sum of " + sum +
           " (will stop on 306)");

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Tue Jul 24 23:37:42 2012
@@ -18,18 +18,16 @@
 
 package org.apache.giraph.examples;
 
-import java.io.IOException;
-import java.util.Iterator;
-
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.WorkerContext;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.log4j.Logger;
 
-import org.apache.giraph.graph.BasicVertex;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.WorkerContext;
+import java.io.IOException;
 
 /**
  * Vertex to allow unit testing of graph mutations.
@@ -55,7 +53,7 @@ public class SimpleMutateGraphVertex ext
   }
 
   @Override
-  public void compute(Iterator<DoubleWritable> msgIterator)
+  public void compute(Iterable<DoubleWritable> messages)
     throws IOException {
     SimpleMutateGraphVertexWorkerContext workerContext =
         (SimpleMutateGraphVertexWorkerContext) getWorkerContext();
@@ -65,77 +63,77 @@ public class SimpleMutateGraphVertex ext
       // Send messages to vertices that are sure not to exist
       // (creating them)
       LongWritable destVertexId =
-          new LongWritable(rangeVertexIdStart(1) + getVertexId().get());
-      sendMsg(destVertexId, new DoubleWritable(0.0));
+          new LongWritable(rangeVertexIdStart(1) + getId().get());
+      sendMessage(destVertexId, new DoubleWritable(0.0));
     } else if (getSuperstep() == 2) {
       LOG.debug("Reached superstep " + getSuperstep());
     } else if (getSuperstep() == 3) {
       long vertexCount = workerContext.getVertexCount();
-      if (vertexCount * 2 != getNumVertices()) {
+      if (vertexCount * 2 != getTotalNumVertices()) {
         throw new IllegalStateException(
-            "Impossible to have " + getNumVertices() +
+            "Impossible to have " + getTotalNumVertices() +
             " vertices when should have " + vertexCount * 2 +
             " on superstep " + getSuperstep());
       }
       long edgeCount = workerContext.getEdgeCount();
-      if (edgeCount != getNumEdges()) {
+      if (edgeCount != getTotalNumEdges()) {
         throw new IllegalStateException(
-            "Impossible to have " + getNumEdges() +
+            "Impossible to have " + getTotalNumEdges() +
             " edges when should have " + edgeCount +
             " on superstep " + getSuperstep());
       }
       // Create vertices that are sure not to exist (doubling vertices)
       LongWritable vertexIndex =
-          new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
-      BasicVertex<LongWritable, DoubleWritable,
-      FloatWritable, DoubleWritable> vertex =
-        instantiateVertex(vertexIndex, null, null, null);
+          new LongWritable(rangeVertexIdStart(3) + getId().get());
+      Vertex<LongWritable, DoubleWritable,
+            FloatWritable, DoubleWritable> vertex =
+        instantiateVertex(vertexIndex, new DoubleWritable(0.0), null, null);
       addVertexRequest(vertex);
       // Add edges to those remote vertices as well
       addEdgeRequest(vertexIndex,
           new Edge<LongWritable, FloatWritable>(
-              getVertexId(), new FloatWritable(0.0f)));
+              getId(), new FloatWritable(0.0f)));
     } else if (getSuperstep() == 4) {
       LOG.debug("Reached superstep " + getSuperstep());
     } else if (getSuperstep() == 5) {
       long vertexCount = workerContext.getVertexCount();
-      if (vertexCount * 2 != getNumVertices()) {
+      if (vertexCount * 2 != getTotalNumVertices()) {
         throw new IllegalStateException(
-            "Impossible to have " + getNumVertices() +
+            "Impossible to have " + getTotalNumVertices() +
             " when should have " + vertexCount * 2 +
             " on superstep " + getSuperstep());
       }
       long edgeCount = workerContext.getEdgeCount();
-      if (edgeCount + vertexCount != getNumEdges()) {
+      if (edgeCount + vertexCount != getTotalNumEdges()) {
         throw new IllegalStateException(
-            "Impossible to have " + getNumEdges() +
+            "Impossible to have " + getTotalNumEdges() +
             " edges when should have " + edgeCount + vertexCount +
             " on superstep " + getSuperstep());
       }
       // Remove the edges created in superstep 3
       LongWritable vertexIndex =
-          new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
+          new LongWritable(rangeVertexIdStart(3) + getId().get());
       workerContext.increaseEdgesRemoved();
-      removeEdgeRequest(vertexIndex, getVertexId());
+      removeEdgeRequest(vertexIndex, getId());
     } else if (getSuperstep() == 6) {
       // Remove all the vertices created in superstep 3
-      if (getVertexId().compareTo(
+      if (getId().compareTo(
           new LongWritable(rangeVertexIdStart(3))) >= 0) {
-        removeVertexRequest(getVertexId());
+        removeVertexRequest(getId());
       }
     } else if (getSuperstep() == 7) {
       long origEdgeCount = workerContext.getOrigEdgeCount();
-      if (origEdgeCount != getNumEdges()) {
+      if (origEdgeCount != getTotalNumEdges()) {
         throw new IllegalStateException(
-            "Impossible to have " + getNumEdges() +
+            "Impossible to have " + getTotalNumEdges() +
             " edges when should have " + origEdgeCount +
             " on superstep " + getSuperstep());
       }
     } else if (getSuperstep() == 8) {
       long vertexCount = workerContext.getVertexCount();
-      if (vertexCount / 2 != getNumVertices()) {
+      if (vertexCount / 2 != getTotalNumVertices()) {
         throw new IllegalStateException(
-            "Impossible to have " + getNumVertices() +
+            "Impossible to have " + getTotalNumVertices() +
             " vertices when should have " + vertexCount / 2 +
             " on superstep " + getSuperstep());
       }
@@ -170,8 +168,8 @@ public class SimpleMutateGraphVertex ext
 
     @Override
     public void postSuperstep() {
-      vertexCount = getNumVertices();
-      edgeCount = getNumEdges();
+      vertexCount = getTotalNumVertices();
+      edgeCount = getTotalNumEdges();
       if (getSuperstep() == 1) {
         origEdgeCount = edgeCount;
       }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Tue Jul 24 23:37:42 2012
@@ -18,14 +18,12 @@
 
 package org.apache.giraph.examples;
 
-import com.google.common.collect.Maps;
-
 import org.apache.giraph.aggregators.DoubleMaxAggregator;
 import org.apache.giraph.aggregators.DoubleMinAggregator;
 import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.graph.VertexWriter;
 import org.apache.giraph.graph.WorkerContext;
@@ -40,8 +38,9 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Maps;
+
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -58,30 +57,31 @@ public class SimplePageRankVertex extend
       Logger.getLogger(SimplePageRankVertex.class);
 
   @Override
-  public void compute(Iterator<DoubleWritable> msgIterator) {
+  public void compute(Iterable<DoubleWritable> messages) {
     LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
     DoubleMinAggregator minAggreg = (DoubleMinAggregator) getAggregator("min");
     DoubleMaxAggregator maxAggreg = (DoubleMaxAggregator) getAggregator("max");
+
     if (getSuperstep() >= 1) {
       double sum = 0;
-      while (msgIterator.hasNext()) {
-        sum += msgIterator.next().get();
+      for (DoubleWritable message : messages) {
+        sum += message.get();
       }
       DoubleWritable vertexValue =
-          new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
-      setVertexValue(vertexValue);
+          new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
+      setValue(vertexValue);
       maxAggreg.aggregate(vertexValue);
       minAggreg.aggregate(vertexValue);
       sumAggreg.aggregate(1L);
-      LOG.info(getVertexId() + ": PageRank=" + vertexValue +
+      LOG.info(getId() + ": PageRank=" + vertexValue +
           " max=" + maxAggreg.getAggregatedValue() +
           " min=" + minAggreg.getAggregatedValue());
     }
 
     if (getSuperstep() < MAX_SUPERSTEPS) {
-      long edges = getNumOutEdges();
-      sendMsgToAllEdges(
-          new DoubleWritable(getVertexValue().get() / edges));
+      long edges = getNumEdges();
+      sendMessageToAllEdges(
+          new DoubleWritable(getValue().get() / edges));
     } else {
       voteToHalt();
     }
@@ -150,11 +150,11 @@ public class SimplePageRankVertex extend
       if (getSuperstep() >= 3) {
         LOG.info("aggregatedNumVertices=" +
             sumAggreg.getAggregatedValue() +
-            " NumVertices=" + getNumVertices());
-        if (sumAggreg.getAggregatedValue().get() != getNumVertices()) {
+            " NumVertices=" + getTotalNumVertices());
+        if (sumAggreg.getAggregatedValue().get() != getTotalNumVertices()) {
           throw new RuntimeException("wrong value of SumAggreg: " +
               sumAggreg.getAggregatedValue() + ", should be: " +
-              getNumVertices());
+              getTotalNumVertices());
         }
         DoubleWritable maxPagerank = maxAggreg.getAggregatedValue();
         LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
@@ -194,27 +194,27 @@ public class SimplePageRankVertex extend
     }
 
     @Override
-    public BasicVertex<LongWritable, DoubleWritable,
-    FloatWritable, DoubleWritable>
+    public Vertex<LongWritable, DoubleWritable,
+        FloatWritable, DoubleWritable>
     getCurrentVertex() throws IOException {
-      BasicVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+      Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
       vertex = BspUtils.createVertex(configuration);
 
       LongWritable vertexId = new LongWritable(
           (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
       DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
-      long destVertexId =
+      long targetVertexId =
           (vertexId.get() + 1) %
           (inputSplit.getNumSplits() * totalRecords);
       float edgeValue = vertexId.get() * 100f;
       Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
-      edges.put(new LongWritable(destVertexId), new FloatWritable(edgeValue));
+      edges.put(new LongWritable(targetVertexId), new FloatWritable(edgeValue));
       vertex.initialize(vertexId, vertexValue, edges, null);
       ++recordsRead;
       if (LOG.isInfoEnabled()) {
-        LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
-            ", vertexValue=" + vertex.getVertexValue() +
-            ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue);
+        LOG.info("next: Return vertexId=" + vertex.getId().get() +
+            ", vertexValue=" + vertex.getValue() +
+            ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
       }
       return vertex;
     }
@@ -252,11 +252,11 @@ public class SimplePageRankVertex extend
 
     @Override
     public void writeVertex(
-      BasicVertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
+      Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
       throws IOException, InterruptedException {
       getRecordWriter().write(
-          new Text(vertex.getVertexId().toString()),
-          new Text(vertex.getVertexValue().toString()));
+          new Text(vertex.getId().toString()),
+          new Text(vertex.getValue().toString()));
     }
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java Tue Jul 24 23:37:42 2012
@@ -18,14 +18,13 @@
 
 package org.apache.giraph.examples;
 
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.log4j.Logger;
 
-import java.util.Iterator;
-
 /**
  * Demonstrates the basic Pregel shortest paths implementation.
  */
@@ -50,37 +49,33 @@ public class SimpleShortestPathsVertex e
    * @return True if the source id
    */
   private boolean isSource() {
-    return getVertexId().get() ==
+    return getId().get() ==
         getContext().getConfiguration().getLong(SOURCE_ID,
             SOURCE_ID_DEFAULT);
   }
 
   @Override
-  public void compute(Iterator<DoubleWritable> msgIterator) {
+  public void compute(Iterable<DoubleWritable> messages) {
     if (getSuperstep() == 0) {
-      setVertexValue(new DoubleWritable(Double.MAX_VALUE));
+      setValue(new DoubleWritable(Double.MAX_VALUE));
     }
     double minDist = isSource() ? 0d : Double.MAX_VALUE;
-    while (msgIterator.hasNext()) {
-      minDist = Math.min(minDist, msgIterator.next().get());
+    for (DoubleWritable message : messages) {
+      minDist = Math.min(minDist, message.get());
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
-          " vertex value = " + getVertexValue());
+      LOG.debug("Vertex " + getId() + " got minDist = " + minDist +
+          " vertex value = " + getValue());
     }
-    if (minDist < getVertexValue().get()) {
-      setVertexValue(new DoubleWritable(minDist));
-      for (Iterator<LongWritable> edges = getOutEdgesIterator();
-           edges.hasNext();) {
-        LongWritable targetVertexId = edges.next();
-        FloatWritable edgeValue = getEdgeValue(targetVertexId);
+    if (minDist < getValue().get()) {
+      setValue(new DoubleWritable(minDist));
+      for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+        double distance = minDist + edge.getValue().get();
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Vertex " + getVertexId() + " sent to " +
-              targetVertexId + " = " +
-              (minDist + edgeValue.get()));
+          LOG.debug("Vertex " + getId() + " sent to " +
+              edge.getTargetVertexId() + " = " + distance);
         }
-        sendMsg(targetVertexId,
-            new DoubleWritable(minDist + edgeValue.get()));
+        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
       }
     }
     voteToHalt();

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Tue Jul 24 23:37:42 2012
@@ -18,10 +18,9 @@
 
 package org.apache.giraph.examples;
 
-import com.google.common.collect.Maps;
-import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.graph.VertexWriter;
 import org.apache.giraph.lib.TextVertexOutputFormat;
@@ -35,8 +34,9 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Maps;
+
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -46,7 +46,7 @@ import java.util.Map;
 public class SimpleSuperstepVertex extends
     EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
   @Override
-  public void compute(Iterator<IntWritable> msgIterator) {
+  public void compute(Iterable<IntWritable> messages) {
     if (getSuperstep() > 3) {
       voteToHalt();
     }
@@ -74,11 +74,10 @@ public class SimpleSuperstepVertex exten
     }
 
     @Override
-    public BasicVertex<LongWritable, IntWritable, FloatWritable,
-    IntWritable> getCurrentVertex()
+    public Vertex<LongWritable, IntWritable, FloatWritable,
+        IntWritable> getCurrentVertex()
       throws IOException, InterruptedException {
-      BasicVertex<LongWritable, IntWritable,
-      FloatWritable, IntWritable> vertex =
+      Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
         BspUtils.<LongWritable, IntWritable, FloatWritable,
         IntWritable>createVertex(configuration);
       long tmpId = reverseIdOrder ?
@@ -89,18 +88,18 @@ public class SimpleSuperstepVertex exten
       IntWritable vertexValue =
           new IntWritable((int) (vertexId.get() * 10));
       Map<LongWritable, FloatWritable> edgeMap = Maps.newHashMap();
-      long destVertexId =
+      long targetVertexId =
           (vertexId.get() + 1) %
           (inputSplit.getNumSplits() * totalRecords);
       float edgeValue = vertexId.get() * 100f;
-      edgeMap.put(new LongWritable(destVertexId),
+      edgeMap.put(new LongWritable(targetVertexId),
           new FloatWritable(edgeValue));
       vertex.initialize(vertexId, vertexValue, edgeMap, null);
       ++recordsRead;
       if (LOG.isInfoEnabled()) {
-        LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
-            ", vertexValue=" + vertex.getVertexValue() +
-            ", destinationId=" + destVertexId +
+        LOG.info("next: Return vertexId=" + vertex.getId().get() +
+            ", vertexValue=" + vertex.getValue() +
+            ", targetVertexId=" + targetVertexId +
             ", edgeValue=" + edgeValue);
       }
       return vertex;
@@ -137,11 +136,11 @@ public class SimpleSuperstepVertex exten
     }
 
     @Override
-    public void writeVertex(BasicVertex<LongWritable, IntWritable,
-        FloatWritable, ?> vertex) throws IOException, InterruptedException {
+    public void writeVertex(Vertex<LongWritable, IntWritable,
+            FloatWritable, ?> vertex) throws IOException, InterruptedException {
       getRecordWriter().write(
-          new Text(vertex.getVertexId().toString()),
-          new Text(vertex.getVertexValue().toString()));
+          new Text(vertex.getId().toString()),
+          new Text(vertex.getValue().toString()));
     }
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java Tue Jul 24 23:37:42 2012
@@ -25,7 +25,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexWriter;
 import org.apache.giraph.lib.TextVertexOutputFormat;
 
@@ -51,11 +51,11 @@ public class SimpleTextVertexOutputForma
 
     @Override
     public void writeVertex(
-      BasicVertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
+      Vertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
       throws IOException, InterruptedException {
       getRecordWriter().write(
-          new Text(vertex.getVertexId().toString()),
-          new Text(vertex.getVertexValue().toString()));
+          new Text(vertex.getId().toString()),
+          new Text(vertex.getValue().toString()));
     }
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java Tue Jul 24 23:37:42 2012
@@ -18,17 +18,17 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.ArrayWritable;
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.Set;
-import java.util.HashSet;
+import java.util.TreeMap;
 
 /**
  * Demonstrates triangle closing in simple,
@@ -71,22 +71,20 @@ public class SimpleTriangleClosingVertex
   private Set<Integer> recvSet = new HashSet<Integer>();
 
   @Override
-  public void compute(Iterator<IntWritable> msgIterator) {
+  public void compute(Iterable<IntWritable> messages) {
     if (getSuperstep() == 0) {
       // obtain list of all out-edges from THIS vertex
-      Iterator<IntWritable> iterator = getOutEdgesIterator();
-      while (iterator.hasNext()) {
-        sendMsgToAllEdges(iterator.next());
+      for (Edge<IntWritable, NullWritable> edge : getEdges()) {
+        sendMessageToAllEdges(edge.getTargetVertexId());
       }
     } else {
-      while (msgIterator.hasNext()) {
-        IntWritable iw = msgIterator.next();
-        int inId = iw.get();
+      for (IntWritable message : messages) {
+        int inId = message.get();
         if (recvSet.contains(inId)) {
-          int current = closeMap.get(iw) == null ? 0 : inId;
-          closeMap.put(iw, current + 1);
+          int current = closeMap.get(message) == null ? 0 : inId;
+          closeMap.put(message, current + 1);
         }
-        if (inId != getVertexId().get()) {
+        if (inId != getId().get()) {
           recvSet.add(inId);
         }
       }
@@ -97,7 +95,7 @@ public class SimpleTriangleClosingVertex
       }
       IntArrayWritable result = new IntArrayWritable();
       result.set(temp);
-      setVertexValue(result);
+      setValue(result);
     }
     voteToHalt();
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java Tue Jul 24 23:37:42 2012
@@ -18,14 +18,10 @@
 
 package org.apache.giraph.examples;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.giraph.examples.SimpleSuperstepVertex.
-  SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.graph.GiraphJob;
+    SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +34,9 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 /**
  * Fully runnable example of how to
  * emit worker data to HDFS during a graph
@@ -52,15 +51,14 @@ public class SimpleVertexWithWorkerConte
   private static final int TESTLENGTH = 30;
 
   @Override
-  public void compute(Iterator<DoubleWritable> msgIterator)
-    throws IOException {
+  public void compute(Iterable<DoubleWritable> messages) throws IOException {
 
     long superstep = getSuperstep();
 
     if (superstep < TESTLENGTH) {
       EmitterWorkerContext emitter =
           (EmitterWorkerContext) getWorkerContext();
-      emitter.emit("vertexId=" + getVertexId() +
+      emitter.emit("vertexId=" + getId() +
           " superstep=" + superstep + "\n");
     } else {
       voteToHalt();

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java Tue Jul 24 23:37:42 2012
@@ -19,6 +19,7 @@
 package org.apache.giraph.examples;
 
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.WorkerContext;
 import org.apache.hadoop.io.FloatWritable;
@@ -30,7 +31,6 @@ import org.apache.log4j.Logger;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Iterator;
 
 /**
  * An example that simply uses its id, value, and edges to compute new data
@@ -142,7 +142,7 @@ public class VerifyMessage {
     }
 
     @Override
-    public void compute(Iterator<VerifiableMessage> msgIterator) {
+    public void compute(Iterable<VerifiableMessage> messages) {
       LongSumAggregator sumAggregator = (LongSumAggregator)
           getAggregator(LongSumAggregator.class.getName());
       if (getSuperstep() > SUPERSTEPS) {
@@ -152,61 +152,57 @@ public class VerifyMessage {
       if (LOG.isDebugEnabled()) {
         LOG.debug("compute: " + sumAggregator);
       }
-      sumAggregator.aggregate(getVertexId().get());
+      sumAggregator.aggregate(getId().get());
       if (LOG.isDebugEnabled()) {
         LOG.debug("compute: sum = " +
             sumAggregator.getAggregatedValue().get() +
-            " for vertex " + getVertexId());
+            " for vertex " + getId());
       }
       float msgValue = 0.0f;
-      while (msgIterator.hasNext()) {
-        VerifiableMessage msg = msgIterator.next();
-        msgValue += msg.value;
+      for (VerifiableMessage message : messages) {
+        msgValue += message.value;
         if (LOG.isDebugEnabled()) {
-          LOG.debug("compute: got msg = " + msg +
-              " for vertex id " + getVertexId() +
-              ", vertex value " + getVertexValue() +
+          LOG.debug("compute: got msg = " + message +
+              " for vertex id " + getId() +
+              ", vertex value " + getValue() +
               " on superstep " + getSuperstep());
         }
-        if (msg.superstep != getSuperstep() - 1) {
+        if (message.superstep != getSuperstep() - 1) {
           throw new IllegalStateException(
               "compute: Impossible to not get a messsage from " +
                   "the previous superstep, current superstep = " +
                   getSuperstep());
         }
-        if ((msg.sourceVertexId != getVertexId().get() - 1) &&
-            (getVertexId().get() != 0)) {
+        if ((message.sourceVertexId != getId().get() - 1) &&
+            (getId().get() != 0)) {
           throw new IllegalStateException(
               "compute: Impossible that this message didn't come " +
                   "from the previous vertex and came from " +
-                  msg.sourceVertexId);
+                  message.sourceVertexId);
         }
       }
-      int vertexValue = getVertexValue().get();
-      setVertexValue(new IntWritable(vertexValue + (int) msgValue));
+      int vertexValue = getValue().get();
+      setValue(new IntWritable(vertexValue + (int) msgValue));
       if (LOG.isDebugEnabled()) {
-        LOG.debug("compute: vertex " + getVertexId() +
-            " has value " + getVertexValue() +
+        LOG.debug("compute: vertex " + getId() +
+            " has value " + getValue() +
             " on superstep " + getSuperstep());
       }
-      for (Iterator<LongWritable> edges = getOutEdgesIterator();
-           edges.hasNext();) {
-        LongWritable targetVertexId = edges.next();
-        FloatWritable edgeValue = getEdgeValue(targetVertexId);
+      for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+        FloatWritable newEdgeValue = new FloatWritable(
+            edge.getValue().get() + (float) vertexValue);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("compute: vertex " + getVertexId() +
-              " sending edgeValue " + edgeValue +
+          LOG.debug("compute: vertex " + getId() +
+              " sending edgeValue " + edge.getValue() +
               " vertexValue " + vertexValue +
-              " total " +
-              (edgeValue.get() + (float) vertexValue) +
-              " to vertex " + targetVertexId +
+              " total " + newEdgeValue +
+              " to vertex " + edge.getTargetVertexId() +
               " on superstep " + getSuperstep());
         }
-        edgeValue.set(edgeValue.get() + (float) vertexValue);
-        addEdge(targetVertexId, edgeValue);
-        sendMsg(targetVertexId,
+        addEdge(edge.getTargetVertexId(), newEdgeValue);
+        sendMessage(edge.getTargetVertexId(),
             new VerifiableMessage(
-                getSuperstep(), getVertexId().get(), edgeValue.get()));
+                getSuperstep(), getId().get(), newEdgeValue.get()));
       }
     }
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java Tue Jul 24 23:37:42 2012
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexWriter;
 import org.apache.giraph.lib.TextVertexOutputFormat;
 import org.apache.hadoop.io.IntWritable;
@@ -63,13 +63,13 @@ public class VertexWithComponentTextOutp
     }
 
     @Override
-    public void writeVertex(BasicVertex<IntWritable, IntWritable,
-        NullWritable, ?> vertex) throws IOException,
+    public void writeVertex(Vertex<IntWritable, IntWritable,
+            NullWritable, ?> vertex) throws IOException,
         InterruptedException {
       StringBuilder output = new StringBuilder();
-      output.append(vertex.getVertexId().get());
+      output.append(vertex.getId().get());
       output.append('\t');
-      output.append(vertex.getVertexValue().get());
+      output.append(vertex.getValue().get());
       getRecordWriter().write(new Text(output.toString()), null);
     }
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java Tue Jul 24 23:37:42 2012
@@ -39,7 +39,7 @@ public interface BasicVertexResolver<I e
    * excluding the normal case (a vertex already exists and has zero or more
    * messages sent it to).
    *
-   * @param vertexId Vertex id (can be used for {@link BasicVertex}'s
+   * @param vertexId Vertex id (can be used for {@link Vertex}'s
    *        initialize())
    * @param vertex Original vertex or null if none
    * @param vertexChanges Changes that happened to this vertex or null if none
@@ -47,8 +47,8 @@ public interface BasicVertexResolver<I e
    * @return Vertex to be returned, if null, and a vertex currently exists
    *         it will be removed
    */
-  BasicVertex<I, V, E, M> resolve(I vertexId,
-      BasicVertex<I, V, E, M> vertex,
+  Vertex<I, V, E, M> resolve(I vertexId,
+      Vertex<I, V, E, M> vertex,
       VertexChanges<I, V, E, M> vertexChanges,
       Iterable<M> messages);
 
@@ -57,5 +57,5 @@ public interface BasicVertexResolver<I e
    *
    * @return Newly instantiated vertex.
    */
-  BasicVertex<I, V, E, M> instantiateVertex();
+  Vertex<I, V, E, M> instantiateVertex();
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Jul 24 23:37:42 2012
@@ -18,12 +18,16 @@
 
 package org.apache.giraph.graph;
 
-import net.iharder.Base64;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.SuperstepState;
 import org.apache.giraph.graph.GraphMapper.MapFunctions;
+import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -48,6 +52,8 @@ import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import net.iharder.Base64;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -65,12 +71,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.giraph.graph.partition.MasterGraphPartitioner;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.graph.partition.PartitionStats;
-import org.apache.giraph.graph.partition.PartitionUtils;
-import org.apache.giraph.utils.WritableUtils;
-
 /**
  * ZooKeeper-based implementation of {@link CentralizedService}.
  *
@@ -1556,8 +1556,8 @@ public class BspServiceMaster<I extends 
     // The master.compute() should run logically before the workers, so
     // increase the superstep counter it uses by one
     graphState.setSuperstep(superstep + 1);
-    graphState.setNumVertices(vertexCounter.getValue());
-    graphState.setNumEdges(edgeCounter.getValue());
+    graphState.setTotalNumVertices(vertexCounter.getValue());
+    graphState.setTotalNumEdges(edgeCounter.getValue());
     graphState.setContext(getContext());
     graphState.setGraphMapper(getGraphMapper());
     masterCompute.setGraphState(graphState);

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Jul 24 23:37:42 2012
@@ -18,14 +18,12 @@
 
 package org.apache.giraph.graph;
 
-import net.iharder.Base64;
-
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.NettyWorkerClientServer;
 import org.apache.giraph.comm.RPCCommunications;
-import org.apache.giraph.comm.WorkerServer;
 import org.apache.giraph.comm.WorkerClientServer;
+import org.apache.giraph.comm.WorkerServer;
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionExchange;
 import org.apache.giraph.graph.partition.PartitionOwner;
@@ -54,6 +52,8 @@ import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import net.iharder.Base64;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -319,7 +319,7 @@ public class BspServiceWorker<I extends 
     for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
       inputSplitCache.entrySet()) {
       if (!entry.getValue().getVertices().isEmpty()) {
-        commService.sendPartitionReq(entry.getKey().getWorkerInfo(),
+        commService.sendPartitionRequest(entry.getKey().getWorkerInfo(),
             entry.getValue());
         entry.getValue().getVertices().clear();
       }
@@ -447,20 +447,20 @@ public class BspServiceWorker<I extends 
     long vertexCount = 0;
     long edgeCount = 0;
     while (vertexReader.nextVertex()) {
-      BasicVertex<I, V, E, M> readerVertex =
+      Vertex<I, V, E, M> readerVertex =
           vertexReader.getCurrentVertex();
-      if (readerVertex.getVertexId() == null) {
+      if (readerVertex.getId() == null) {
         throw new IllegalArgumentException(
             "loadVertices: Vertex reader returned a vertex " +
                 "without an id!  - " + readerVertex);
       }
-      if (readerVertex.getVertexValue() == null) {
-        readerVertex.setVertexValue(
+      if (readerVertex.getValue() == null) {
+        readerVertex.setValue(
             BspUtils.<V>createVertexValue(getConfiguration()));
       }
       PartitionOwner partitionOwner =
           workerGraphPartitioner.getPartitionOwner(
-              readerVertex.getVertexId());
+              readerVertex.getId());
       Partition<I, V, E, M> partition =
           inputSplitCache.get(partitionOwner);
       if (partition == null) {
@@ -469,23 +469,23 @@ public class BspServiceWorker<I extends 
             partitionOwner.getPartitionId());
         inputSplitCache.put(partitionOwner, partition);
       }
-      BasicVertex<I, V, E, M> oldVertex =
+      Vertex<I, V, E, M> oldVertex =
           partition.putVertex(readerVertex);
       if (oldVertex != null) {
         LOG.warn("readVertices: Replacing vertex " + oldVertex +
             " with " + readerVertex);
       }
       if (partition.getVertices().size() >= maxVerticesPerPartition) {
-        commService.sendPartitionReq(partitionOwner.getWorkerInfo(),
+        commService.sendPartitionRequest(partitionOwner.getWorkerInfo(),
             partition);
         partition.getVertices().clear();
       }
       ++vertexCount;
-      edgeCount += readerVertex.getNumOutEdges();
+      edgeCount += readerVertex.getNumEdges();
       getContext().progress();
 
       ++totalVerticesLoaded;
-      totalEdgesLoaded += readerVertex.getNumOutEdges();
+      totalEdgesLoaded += readerVertex.getNumEdges();
       // Update status every half a million vertices
       if ((totalVerticesLoaded % 500000) == 0) {
         String status = "readVerticesFromInputSplit: Loaded " +
@@ -519,9 +519,9 @@ public class BspServiceWorker<I extends 
   }
 
   @Override
-  public void assignMessagesToVertex(BasicVertex<I, V, E, M> vertex,
-      Iterable<M> messageIterator) {
-    vertex.putMessages(messageIterator);
+  public void assignMessagesToVertex(Vertex<I, V, E, M> vertex,
+      Iterable<M> messages) {
+    vertex.putMessages(messages);
   }
 
   @Override
@@ -674,7 +674,7 @@ public class BspServiceWorker<I extends 
         new ArrayList<PartitionStats>();
     for (Partition<I, V, E, M> partition : getPartitionMap().values()) {
       PartitionStats partitionStats =
-          new PartitionStats(partition.getPartitionId(),
+          new PartitionStats(partition.getId(),
               partition.getVertices().size(),
               0,
               partition.getEdgeCount());
@@ -1056,8 +1056,8 @@ public class BspServiceWorker<I extends 
         " - Attempt=" + getApplicationAttempt() +
         ", Superstep=" + getSuperstep());
     getGraphMapper().getGraphState().
-    setNumEdges(globalStats.getEdgeCount()).
-    setNumVertices(globalStats.getVertexCount());
+        setTotalNumEdges(globalStats.getEdgeCount()).
+        setTotalNumVertices(globalStats.getVertexCount());
     return globalStats.getHaltComputation();
   }
 
@@ -1080,7 +1080,7 @@ public class BspServiceWorker<I extends 
         vertexOutputFormat.createVertexWriter(getContext());
     vertexWriter.initialize(getContext());
     for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
-      for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
+      for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
         vertexWriter.writeVertex(vertex);
       }
     }
@@ -1185,7 +1185,7 @@ public class BspServiceWorker<I extends 
       //   <index 0 start pos><partition id>
       //   <index 1 start pos><partition id>
       metadataOutput.writeLong(startPos);
-      metadataOutput.writeInt(partition.getPartitionId());
+      metadataOutput.writeInt(partition.getId());
       if (LOG.isDebugEnabled()) {
         LOG.debug("storeCheckpoint: Vertex file starting " +
             "offset = " + startPos + ", length = " +
@@ -1314,8 +1314,8 @@ public class BspServiceWorker<I extends 
               partitionId);
         }
         getGraphMapper().getGraphState().getWorkerCommunications().
-        sendPartitionReq(workerPartitionList.getKey(),
-            partition);
+            sendPartitionRequest(workerPartitionList.getKey(),
+                partition);
         getPartitionMap().remove(partitionId);
       }
     }
@@ -1426,10 +1426,10 @@ public class BspServiceWorker<I extends 
    */
   private void movePartitionsToWorker(
       WorkerServer<I, V, E, M> commService) {
-    Map<Integer, Collection<BasicVertex<I, V, E, M>>> inPartitionVertexMap =
+    Map<Integer, Collection<Vertex<I, V, E, M>>> inPartitionVertexMap =
         commService.getInPartitionVertexMap();
     synchronized (inPartitionVertexMap) {
-      for (Entry<Integer, Collection<BasicVertex<I, V, E, M>>> entry :
+      for (Entry<Integer, Collection<Vertex<I, V, E, M>>> entry :
         inPartitionVertexMap.entrySet()) {
         if (getPartitionMap().containsKey(entry.getKey())) {
           throw new IllegalStateException(
@@ -1443,7 +1443,7 @@ public class BspServiceWorker<I extends 
             new Partition<I, V, E, M>(getConfiguration(),
                 entry.getKey());
         synchronized (entry.getValue()) {
-          for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
+          for (Vertex<I, V, E, M> vertex : entry.getValue()) {
             if (tmpPartition.putVertex(vertex) != null) {
               throw new IllegalStateException(
                   "moveVerticesToWorker: Vertex " + vertex +
@@ -1455,7 +1455,7 @@ public class BspServiceWorker<I extends 
                 entry.getValue().size() +
                 " vertices for partition id " + entry.getKey());
           }
-          getPartitionMap().put(tmpPartition.getPartitionId(),
+          getPartitionMap().put(tmpPartition.getId(),
               tmpPartition);
           entry.getValue().clear();
         }
@@ -1527,27 +1527,27 @@ public class BspServiceWorker<I extends 
   }
 
   @Override
-  public PartitionOwner getVertexPartitionOwner(I vertexIndex) {
-    return workerGraphPartitioner.getPartitionOwner(vertexIndex);
+  public PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return workerGraphPartitioner.getPartitionOwner(vertexId);
   }
 
   /**
    * Get the partition for a vertex index.
    *
-   * @param vertexIndex Vertex index to search for the partition.
+   * @param vertexId Vertex index to search for the partition.
    * @return Partition that owns this vertex.
    */
-  public Partition<I, V, E, M> getPartition(I vertexIndex) {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+  public Partition<I, V, E, M> getPartition(I vertexId) {
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
     return workerPartitionMap.get(partitionOwner.getPartitionId());
   }
 
   @Override
-  public BasicVertex<I, V, E, M> getVertex(I vertexIndex) {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+  public Vertex<I, V, E, M> getVertex(I vertexId) {
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
     if (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
       return workerPartitionMap.get(
-          partitionOwner.getPartitionId()).getVertex(vertexIndex);
+          partitionOwner.getPartitionId()).getVertex(vertexId);
     } else {
       return null;
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Tue Jul 24 23:37:42 2012
@@ -356,7 +356,7 @@ public class BspUtils {
   }
 
   /**
-   * Get the user's subclassed {@link BasicVertex}
+   * Get the user's subclassed {@link Vertex}
    *
    * @param <I> Vertex id
    * @param <V> Vertex data
@@ -368,11 +368,11 @@ public class BspUtils {
   @SuppressWarnings({ "rawtypes", "unchecked" })
   public static <I extends WritableComparable, V extends Writable,
   E extends Writable, M extends Writable>
-  Class<? extends BasicVertex<I, V, E, M>> getVertexClass(Configuration conf) {
-    return (Class<? extends BasicVertex<I, V, E, M>>)
+  Class<? extends Vertex<I, V, E, M>> getVertexClass(Configuration conf) {
+    return (Class<? extends Vertex<I, V, E, M>>)
       conf.getClass(GiraphJob.VERTEX_CLASS,
         null,
-        BasicVertex.class);
+        Vertex.class);
   }
 
   /**
@@ -387,10 +387,10 @@ public class BspUtils {
    */
   @SuppressWarnings("rawtypes")
   public static <I extends WritableComparable, V extends Writable,
-  E extends Writable, M extends Writable> BasicVertex<I, V, E, M>
+  E extends Writable, M extends Writable> Vertex<I, V, E, M>
   createVertex(Configuration conf) {
-    Class<? extends BasicVertex<I, V, E, M>> vertexClass = getVertexClass(conf);
-    BasicVertex<I, V, E, M> vertex =
+    Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
+    Vertex<I, V, E, M> vertex =
       ReflectionUtils.newInstance(vertexClass, conf);
     return vertex;
   }
@@ -404,8 +404,8 @@ public class BspUtils {
    */
   @SuppressWarnings("unchecked")
   public static <I extends Writable> Class<I>
-  getVertexIndexClass(Configuration conf) {
-    return (Class<I>) conf.getClass(GiraphJob.VERTEX_INDEX_CLASS,
+  getVertexIdClass(Configuration conf) {
+    return (Class<I>) conf.getClass(GiraphJob.VERTEX_ID_CLASS,
       WritableComparable.class);
   }
 
@@ -418,16 +418,16 @@ public class BspUtils {
    */
   @SuppressWarnings("rawtypes")
   public static <I extends WritableComparable>
-  I createVertexIndex(Configuration conf) {
-    Class<I> vertexClass = getVertexIndexClass(conf);
+  I createVertexId(Configuration conf) {
+    Class<I> vertexIdClass = getVertexIdClass(conf);
     try {
-      return vertexClass.newInstance();
+      return vertexIdClass.newInstance();
     } catch (InstantiationException e) {
       throw new IllegalArgumentException(
-        "createVertexIndex: Failed to instantiate", e);
+        "createVertexId: Failed to instantiate", e);
     } catch (IllegalAccessException e) {
       throw new IllegalArgumentException(
-        "createVertexIndex: Illegally accessed", e);
+        "createVertexId: Illegally accessed", e);
     }
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java Tue Jul 24 23:37:42 2012
@@ -18,17 +18,11 @@
 
 package org.apache.giraph.graph;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
 /**
- * A complete edge, the destination vertex and the edge value.  Can only be one
+ * A complete edge, the target vertex and the edge value.  Can only be one
  * edge with a destination vertex id per edge map.
  *
  * @param <I> Vertex index
@@ -36,13 +30,11 @@ import java.io.IOException;
  */
 @SuppressWarnings("rawtypes")
 public class Edge<I extends WritableComparable, E extends Writable>
-    implements WritableComparable<Edge<I, E>>, Configurable {
-  /** Destination vertex id */
-  private I destVertexId = null;
+    implements Comparable<Edge<I, E>> {
+  /** Target vertex id */
+  private I targetVertexId = null;
   /** Edge value */
-  private E edgeValue = null;
-  /** Configuration - Used to instantiate classes */
-  private Configuration conf = null;
+  private E value = null;
 
   /**
    * Constructor for reflection
@@ -52,21 +44,21 @@ public class Edge<I extends WritableComp
   /**
    * Create the edge with final values
    *
-   * @param destVertexId Desination vertex id.
-   * @param edgeValue Value of the edge.
+   * @param targetVertexId Desination vertex id.
+   * @param value Value of the edge.
    */
-  public Edge(I destVertexId, E edgeValue) {
-    this.destVertexId = destVertexId;
-    this.edgeValue = edgeValue;
+  public Edge(I targetVertexId, E value) {
+    this.targetVertexId = targetVertexId;
+    this.value = value;
   }
 
   /**
-   * Get the destination vertex index of this edge
+   * Get the target vertex index of this edge
    *
-   * @return Destination vertex index of this edge
+   * @return Target vertex index of this edge
    */
-  public I getDestVertexId() {
-    return destVertexId;
+  public I getTargetVertexId() {
+    return targetVertexId;
   }
 
   /**
@@ -74,70 +66,38 @@ public class Edge<I extends WritableComp
    *
    * @return Edge value of this edge
    */
-  public E getEdgeValue() {
-    return edgeValue;
+  public E getValue() {
+    return value;
   }
 
   /**
    * Set the destination vertex index of this edge.
    *
-   * @param destVertexId new destination vertex
+   * @param targetVertexId new destination vertex
    */
-  public void setDestVertexId(I destVertexId) {
-    this.destVertexId = destVertexId;
+  public void setTargetVertexId(I targetVertexId) {
+    this.targetVertexId = targetVertexId;
   }
 
   /**
    * Set the value for this edge.
    *
-   * @param edgeValue new edge value
+   * @param value new edge value
    */
-  public void setEdgeValue(E edgeValue) {
-    this.edgeValue = edgeValue;
+  public void setValue(E value) {
+    this.value = value;
   }
 
   @Override
   public String toString() {
-    return "(DestVertexIndex = " + destVertexId +
-        ", edgeValue = " + edgeValue  + ")";
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    destVertexId = BspUtils.<I>createVertexIndex(getConf());
-    destVertexId.readFields(input);
-    edgeValue = BspUtils.<E>createEdgeValue(getConf());
-    edgeValue.readFields(input);
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    if (destVertexId == null) {
-      throw new IllegalStateException(
-          "write: Null destination vertex index");
-    }
-    if (edgeValue == null) {
-      throw new IllegalStateException(
-          "write: Null edge value");
-    }
-    destVertexId.write(output);
-    edgeValue.write(output);
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
+    return "(TargetVertexId = " + targetVertexId + ", " +
+        "value = " + value + ")";
   }
 
   @SuppressWarnings("unchecked")
   @Override
   public int compareTo(Edge<I, E> edge) {
-    return destVertexId.compareTo(edge.getDestVertexId());
+    return targetVertexId.compareTo(edge.getTargetVertexId());
   }
 
   @Override
@@ -151,12 +111,11 @@ public class Edge<I extends WritableComp
 
     Edge edge = (Edge) o;
 
-    if (destVertexId != null ? !destVertexId.equals(edge.destVertexId) :
-      edge.destVertexId != null) {
+    if (targetVertexId != null ? !targetVertexId.equals(edge.targetVertexId) :
+      edge.targetVertexId != null) {
       return false;
     }
-    if (edgeValue != null ?
-        !edgeValue.equals(edge.edgeValue) : edge.edgeValue != null) {
+    if (value != null ? !value.equals(edge.value) : edge.value != null) {
       return false;
     }
 
@@ -165,8 +124,8 @@ public class Edge<I extends WritableComp
 
   @Override
   public int hashCode() {
-    int result = destVertexId != null ? destVertexId.hashCode() : 0;
-    result = 31 * result + (edgeValue != null ? edgeValue.hashCode() : 0);
+    int result = targetVertexId != null ? targetVertexId.hashCode() : 0;
+    result = 31 * result + (value != null ? value.hashCode() : 0);
     return result;
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Tue Jul 24 23:37:42 2012
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.graph;
 
-import org.apache.giraph.utils.ComparisonUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -29,9 +28,6 @@ import com.google.common.collect.Lists;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -39,7 +35,7 @@ import java.util.Map;
 /**
  * User applications can subclass {@link EdgeListVertex}, which stores
  * the outbound edges in an ArrayList (less memory as the cost of expensive
- * sorting and random-access lookup).  Good for static graphs.
+ * random-access lookup).  Good for static graphs.
  *
  * @param <I> Vertex index value
  * @param <V> Vertex value
@@ -52,268 +48,134 @@ public abstract class EdgeListVertex<I e
     extends MutableVertex<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
-  /** Vertex id */
-  private I vertexId = null;
-  /** Vertex value */
-  private V vertexValue = null;
-  /** List of the dest edge indices */
-  private List<I> destEdgeIndexList;
-  /** List of the dest edge values */
-  private List<E> destEdgeValueList;
+  /** List of edges */
+  private List<Edge<I, E>> edgeList = Lists.newArrayList();
   /** List of incoming messages from the previous superstep */
-  private List<M> msgList;
+  private List<M> messageList = Lists.newArrayList();
 
   @Override
-  public void initialize(I vertexId, V vertexValue,
-      Map<I, E> edges,
-      Iterable<M> messages) {
-    if (vertexId != null) {
-      setVertexId(vertexId);
-    }
-    if (vertexValue != null) {
-      setVertexValue(vertexValue);
-    }
-    if (edges != null && !edges.isEmpty()) {
-      destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size());
-      destEdgeValueList = Lists.newArrayListWithCapacity(edges.size());
-      List<I> sortedIndexList = new ArrayList<I>(edges.keySet());
-      Collections.sort(sortedIndexList, new VertexIdComparator());
-      for (I index : sortedIndexList) {
-        destEdgeIndexList.add(index);
-        destEdgeValueList.add(edges.get(index));
+  public void initialize(I id, V value, Map<I, E> edges, Iterable<M> messages) {
+    super.initialize(id, value);
+    if (edges != null) {
+      for (Map.Entry<I, E> edge : edges.entrySet()) {
+        edgeList.add(new Edge<I, E>(edge.getKey(), edge.getValue()));
       }
-      sortedIndexList.clear();
-    } else {
-      destEdgeIndexList = Lists.newArrayListWithCapacity(0);
-      destEdgeValueList = Lists.newArrayListWithCapacity(0);
     }
     if (messages != null) {
-      msgList = Lists.newArrayListWithCapacity(Iterables.size(messages));
-      Iterables.<M>addAll(msgList, messages);
-    } else {
-      msgList = Lists.newArrayListWithCapacity(0);
+      Iterables.<M>addAll(messageList, messages);
     }
   }
 
   @Override
-  public int hashCode() {
-    return vertexId.hashCode() * 37 + vertexValue.hashCode();
+  public Iterable<Edge<I, E>> getEdges() {
+    return edgeList;
   }
 
   @Override
-  public boolean equals(Object other) {
-    if (other instanceof EdgeListVertex) {
-      @SuppressWarnings("unchecked")
-      EdgeListVertex<I, V, E, M> otherVertex = (EdgeListVertex) other;
-      if (!getVertexId().equals(otherVertex.getVertexId())) {
+  public final boolean addEdge(I targetVertexId, E value) {
+    for (Edge<I, E> edge : getEdges()) {
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        LOG.warn("addEdge: Vertex=" + getId() +
+            ": already added an edge value for target vertex id " +
+            targetVertexId);
         return false;
       }
-      if (!getVertexValue().equals(otherVertex.getVertexValue())) {
-        return false;
-      }
-      if (!ComparisonUtils.equal(getMessages(),
-          otherVertex.getMessages())) {
-        return false;
-      }
-      return ComparisonUtils.equal(getOutEdgesIterator(),
-          otherVertex.getOutEdgesIterator());
-    }
-    return false;
-  }
-
-  /**
-   * Comparator for the vertex id
-   */
-  private class VertexIdComparator implements Comparator<I> {
-    @SuppressWarnings("unchecked")
-    @Override
-    public int compare(I index1, I index2) {
-      return index1.compareTo(index2);
     }
+    edgeList.add(new Edge<I, E>(targetVertexId, value));
+    return true;
   }
 
   @Override
-  public final boolean addEdge(I targetVertexId, E edgeValue) {
-    int pos = Collections.binarySearch(destEdgeIndexList,
-        targetVertexId,
-        new VertexIdComparator());
-    if (pos < 0) {
-      destEdgeIndexList.add(-1 * (pos + 1), targetVertexId);
-      destEdgeValueList.add(-1 * (pos + 1), edgeValue);
-      return true;
-    } else {
-      LOG.warn("addEdge: Vertex=" + vertexId +
-          ": already added an edge value for dest vertex id " +
-          targetVertexId);
-      return false;
-    }
-  }
-
-  @Override
-  public long getSuperstep() {
-    return getGraphState().getSuperstep();
-  }
-
-  @Override
-  public final void setVertexId(I vertexId) {
-    this.vertexId = vertexId;
-  }
-
-  @Override
-  public final I getVertexId() {
-    return vertexId;
-  }
-
-  @Override
-  public final V getVertexValue() {
-    return vertexValue;
-  }
-
-  @Override
-  public final void setVertexValue(V vertexValue) {
-    this.vertexValue = vertexValue;
-  }
-
-  @Override
-  public E getEdgeValue(I targetVertexId) {
-    int pos = Collections.binarySearch(destEdgeIndexList,
-        targetVertexId,
-        new VertexIdComparator());
-    if (pos < 0) {
-      return null;
-    } else {
-      return destEdgeValueList.get(pos);
-    }
+  public int getNumEdges() {
+    return edgeList.size();
   }
 
   @Override
-  public boolean hasEdge(I targetVertexId) {
-    int pos = Collections.binarySearch(destEdgeIndexList,
-        targetVertexId,
-        new VertexIdComparator());
-    if (pos < 0) {
-      return false;
+  public E removeEdge(I targetVertexId) {
+    for (Iterator<Edge<I, E>> edges = edgeList.iterator(); edges.hasNext();) {
+      Edge<I, E> edge = edges.next();
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        E edgeValue = edge.getValue();
+        edges.remove();
+        return edgeValue;
+      }
     }
-    return true;
-  }
-
-  /**
-   * Get an iterator to the edges on this vertex.
-   *
-   * @return A <em>sorted</em> iterator, as defined by the sort-order
-   *         of the vertex ids
-   */
-  @Override
-  public Iterator<I> getOutEdgesIterator() {
-    return destEdgeIndexList.iterator();
+    return null;
   }
 
   @Override
-  public int getNumOutEdges() {
-    return destEdgeIndexList.size();
+  void putMessages(Iterable<M> messages) {
+    messageList.clear();
+    Iterables.addAll(messageList, messages);
   }
 
   @Override
-  public E removeEdge(I targetVertexId) {
-    int pos = Collections.binarySearch(destEdgeIndexList,
-        targetVertexId,
-        new VertexIdComparator());
-    if (pos < 0) {
-      return null;
-    } else {
-      destEdgeIndexList.remove(pos);
-      return destEdgeValueList.remove(pos);
-    }
+  public Iterable<M> getMessages() {
+    return Iterables.unmodifiableIterable(messageList);
   }
 
   @Override
-  public final void sendMsgToAllEdges(M msg) {
-    if (msg == null) {
-      throw new IllegalArgumentException(
-          "sendMsgToAllEdges: Cannot send null message to all edges");
-    }
-    for (I index : destEdgeIndexList) {
-      sendMsg(index, msg);
-    }
+  public int getNumMessages() {
+    return messageList.size();
   }
 
   @Override
   public final void readFields(DataInput in) throws IOException {
-    vertexId = BspUtils.<I>createVertexIndex(getConf());
+    I vertexId = BspUtils.<I>createVertexId(getConf());
     vertexId.readFields(in);
-    boolean hasVertexValue = in.readBoolean();
-    if (hasVertexValue) {
-      vertexValue = BspUtils.<V>createVertexValue(getConf());
-      vertexValue.readFields(in);
-    }
-    int edgeListCount = in.readInt();
-    destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount);
-    destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount);
-    for (int i = 0; i < edgeListCount; ++i) {
-      I destVertexId = BspUtils.<I>createVertexIndex(getConf());
+    V vertexValue = BspUtils.<V>createVertexValue(getConf());
+    vertexValue.readFields(in);
+    super.initialize(vertexId, vertexValue);
+
+    int numEdges = in.readInt();
+    edgeList = Lists.newArrayListWithCapacity(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      I targetVertexId = BspUtils.<I>createVertexId(getConf());
+      targetVertexId.readFields(in);
       E edgeValue = BspUtils.<E>createEdgeValue(getConf());
-      destVertexId.readFields(in);
       edgeValue.readFields(in);
-      destEdgeIndexList.add(destVertexId);
-      destEdgeValueList.add(edgeValue);
-    }
-    int msgListSize = in.readInt();
-    msgList = Lists.newArrayListWithCapacity(msgListSize);
-    for (int i = 0; i < msgListSize; ++i) {
-      M msg = BspUtils.<M>createMessageValue(getConf());
-      msg.readFields(in);
-      msgList.add(msg);
+      edgeList.add(new Edge<I, E>(targetVertexId, edgeValue));
     }
-    halt = in.readBoolean();
-  }
 
-  @Override
-  public final void write(DataOutput out) throws IOException {
-    vertexId.write(out);
-    out.writeBoolean(vertexValue != null);
-    if (vertexValue != null) {
-      vertexValue.write(out);
-    }
-    out.writeInt(destEdgeIndexList.size());
-    for (int i = 0; i < destEdgeIndexList.size(); ++i) {
-      destEdgeIndexList.get(i).write(out);
-      destEdgeValueList.get(i).write(out);
+    int numMessages = in.readInt();
+    messageList = Lists.newArrayListWithCapacity(numMessages);
+    for (int i = 0; i < numMessages; ++i) {
+      M message = BspUtils.<M>createMessageValue(getConf());
+      message.readFields(in);
+      messageList.add(message);
     }
-    out.writeInt(msgList.size());
-    for (M msg : msgList) {
-      msg.write(out);
+
+    boolean halt = in.readBoolean();
+    if (halt) {
+      voteToHalt();
+    } else {
+      wakeUp();
     }
-    out.writeBoolean(halt);
   }
 
   @Override
-  void putMessages(Iterable<M> messages) {
-    msgList.clear();
-    for (M message : messages) {
-      msgList.add(message);
+  public final void write(DataOutput out) throws IOException {
+    getId().write(out);
+    getValue().write(out);
+
+    out.writeInt(edgeList.size());
+    for (Edge<I, E> edge : edgeList) {
+      edge.getTargetVertexId().write(out);
+      edge.getValue().write(out);
     }
-  }
 
-  @Override
-  public Iterable<M> getMessages() {
-    return Iterables.unmodifiableIterable(msgList);
-  }
+    out.writeInt(messageList.size());
+    for (M message : messageList) {
+      message.write(out);
+    }
 
-  @Override
-  public int getNumMessages() {
-    return msgList.size();
+    out.writeBoolean(isHalted());
   }
 
   @Override
   void releaseResources() {
     // Hint to GC to free the messages
-    msgList.clear();
-  }
-
-  @Override
-  public String toString() {
-    return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
-        ",#edges=" + getNumOutEdges() + ")";
+    messageList.clear();
   }
 }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Jul 24 23:37:42 2012
@@ -61,7 +61,7 @@ public class GiraphJob {
       "giraph.graphPartitionerFactoryClass";
 
   /** Vertex index class */
-  public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass";
+  public static final String VERTEX_ID_CLASS = "giraph.vertexIdClass";
   /** Vertex value class */
   public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
   /** Edge value class */
@@ -451,7 +451,7 @@ public class GiraphJob {
    * @param vertexClass Runs vertex computation
    */
   public final void setVertexClass(Class<?> vertexClass) {
-    getConfiguration().setClass(VERTEX_CLASS, vertexClass, BasicVertex.class);
+    getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
   }
 
   /**



Mime
View raw message