giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1196639 [1/2] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/...
Date Wed, 02 Nov 2011 15:40:03 GMT
Author: aching
Date: Wed Nov  2 15:40:02 2011
New Revision: 1196639

URL: http://svn.apache.org/viewvc?rev=1196639&view=rev
Log:
GIRAPH-36: Ensure that subclassing BasicVertex is possible by user
  apps. (jake.mannix via aching)


Added:
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/pom.xml
    incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestAdjacencyListTextVertexOutputFormat.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Wed Nov  2 15:40:02 2011
@@ -2,13 +2,16 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
-  GIRAPH-50. Require Maven 3 in order to work with munging plugin.
+  GIRAPH-36: Ensure that subclassing BasicVertex is possible by user
+  apps. (jake.mannix via aching)
+
+  GIRAPH-50: Require Maven 3 in order to work with munging plugin.
   (jghoman)
   
-  GIRAPH-67. Provide AdjacencyList InputFormat for Ids of Strings and
+  GIRAPH-67: Provide AdjacencyList InputFormat for Ids of Strings and
   double values. (jghoman)
 
-  GIRAPH-56. Create a CSV TextOutputFormat. (jghoman)
+  GIRAPH-56: Create a CSV TextOutputFormat. (jghoman)
 
   GIRAPH-66: Add presentations section to website. (jghoman)
 
@@ -16,7 +19,7 @@ Release 0.70.0 - unreleased
   lists. (jghoman)
 
   GIRAPH-59: Missing some test if debug enabled before LOG.debug() and
-  LOG.info(). (guzhiwei via aching).
+  LOG.info(). (guzhiwei via aching)
 
   GIRAPH-48: numFlushThreads is 0 when doing a single worker 
   unittest. Changing the minimum to 1. (aching)
@@ -47,7 +50,7 @@ Release 0.70.0 - unreleased
 
   GIRAPH-31: Hide the SortedMap<I, Edge<I,E>> in Vertex from client
   visibility (impl. detail), replace with appropriate accessor
-  methods. jake.mannix via aching.
+  methods. (jake.mannix via aching)
 
   GIRAPH-30: NPE in ZooKeeperManager if base directory cannot be
   created. apurtell via aching.

Modified: incubator/giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/pom.xml?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/pom.xml (original)
+++ incubator/giraph/trunk/pom.xml Wed Nov  2 15:40:02 2011
@@ -461,6 +461,16 @@ under the License.
       <version>${jackson.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.mahout</groupId>
+      <artifactId>mahout-collections</artifactId>
+      <version>1.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>r09</version>
+    </dependency>
+    <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-mapper-asl</artifactId>
       <version>${jackson.version}</version>

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java Wed Nov  2 15:40:02 2011
@@ -18,13 +18,16 @@
 
 package org.apache.giraph.benchmark;
 
+import com.google.common.collect.Maps;
 import org.apache.giraph.bsp.BspInputSplit;
-import org.apache.giraph.graph.MutableVertex;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -33,6 +36,7 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 /**
@@ -41,8 +45,8 @@ import java.util.Random;
  * and edges per vertex that is repeatable for the exact same parameter
  * (pseudo-random).
  */
-public class PseudoRandomVertexInputFormat extends
-        VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
+public class PseudoRandomVertexInputFormat<M extends Writable> extends
+        VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M> {
     /** Set the number of aggregate vertices */
     public static final String AGGREGATE_VERTICES =
         "pseduoRandomVertexReader.aggregateVertices";
@@ -63,18 +67,18 @@ public class PseudoRandomVertexInputForm
     }
 
     @Override
-    public VertexReader<LongWritable, DoubleWritable, DoubleWritable>
+    public VertexReader<LongWritable, DoubleWritable, DoubleWritable, M>
             createVertexReader(InputSplit split, TaskAttemptContext context)
             throws IOException {
-        return new PseudoRandomVertexReader();
+        return new PseudoRandomVertexReader<M>();
     }
 
     /**
      * Used by {@link PseudoRandomVertexInputFormat} to read
      * pseudo-randomly generated data
      */
-    private static class PseudoRandomVertexReader implements
-            VertexReader<LongWritable, DoubleWritable, DoubleWritable> {
+    private static class PseudoRandomVertexReader<M extends Writable> implements
+            VertexReader<LongWritable, DoubleWritable, DoubleWritable, M> {
         /** Logger */
         private static final Logger LOG =
             Logger.getLogger(PseudoRandomVertexReader.class);
@@ -91,10 +95,15 @@ public class PseudoRandomVertexInputForm
         /** BspInputSplit (used only for index) */
         private BspInputSplit bspInputSplit;
 
+        private Configuration configuration;
+
+        public PseudoRandomVertexReader() {
+        }
+
         @Override
         public void initialize(InputSplit inputSplit,
                                TaskAttemptContext context) throws IOException {
-            Configuration configuration = context.getConfiguration();
+            configuration = context.getConfiguration();
             aggregateVertices =
                 configuration.getLong(
                     PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0);
@@ -132,19 +141,22 @@ public class PseudoRandomVertexInputForm
         }
 
         @Override
-        public boolean next(
-                MutableVertex<LongWritable, DoubleWritable, DoubleWritable, ?>
-                vertex) throws IOException {
-            if (verticesRead >= totalSplitVertices) {
-                return false;
-            }
+        public boolean nextVertex() throws IOException, InterruptedException {
+            return totalSplitVertices > verticesRead;
+        }
+
+        @Override
+        public BasicVertex<LongWritable, DoubleWritable, DoubleWritable, M> getCurrentVertex()
+            throws IOException, InterruptedException {
+            BasicVertex<LongWritable, DoubleWritable, DoubleWritable, M> vertex =
+                BspUtils.createVertex(configuration);
             long vertexId = startingVertexId + verticesRead;
             // Seed on the vertex id to keep the vertex data the same when
             // on different number of workers, but other parameters are the
             // same.
             Random rand = new Random(vertexId);
-            vertex.setVertexId(new LongWritable(vertexId));
-            vertex.setVertexValue(new DoubleWritable(rand.nextDouble()));
+            DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble());
+            Map<LongWritable, DoubleWritable> edges = Maps.newHashMap();
             for (long i = 0; i < edgesPerVertex; ++i) {
                 LongWritable destVertexId = null;
                 do {
@@ -152,8 +164,9 @@ public class PseudoRandomVertexInputForm
                         new LongWritable(Math.abs(rand.nextLong()) %
                                          aggregateVertices);
                 } while (vertex.hasEdge(destVertexId));
-                vertex.addEdge(destVertexId, new DoubleWritable(rand.nextDouble()));
+                edges.put(destVertexId, new DoubleWritable(rand.nextDouble()));
             }
+            vertex.initialize(new LongWritable(vertexId), vertexValue, edges, null);
 
             ++verticesRead;
             if (LOG.isDebugEnabled()) {
@@ -162,7 +175,7 @@ public class PseudoRandomVertexInputForm
                           ", vertexValue=" + vertex.getVertexValue() +
                           ", edgeMap=" + vertex.iterator());
             }
-            return true;
+            return vertex;
         }
 
         @Override

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java Wed Nov  2 15:40:02 2011
@@ -18,12 +18,12 @@
 
 package org.apache.giraph.bsp;
 
-import java.io.IOException;
-
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.BasicVertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.io.IOException;
+
 /**
  * Basic service interface shared by both {@link CentralizedServiceMaster} and
  * {@link CentralizedServiceWorker}.
@@ -44,7 +44,7 @@ public interface CentralizedService<I ex
      *
      * @return representation vertex
      */
-    Vertex<I, V, E, M> getRepresentativeVertex();
+    BasicVertex<I, V, E, M> getRepresentativeVertex();
 
     /**
      * Get the current global superstep of the application to work on.

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Wed Nov  2 15:40:02 2011
@@ -18,12 +18,29 @@
 
 package org.apache.giraph.comm;
 
-import java.io.IOException;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.MutableVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.VertexRange;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -39,29 +56,9 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.log4j.Logger;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.graph.BasicVertex;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.MutableVertex;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexCombiner;
-import org.apache.giraph.graph.VertexMutations;
-import org.apache.giraph.graph.VertexRange;
-import org.apache.giraph.graph.VertexResolver;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 /*if[HADOOP_FACEBOOK]
 import org.apache.hadoop.ipc.ProtocolSignature;
 end[HADOOP_FACEBOOK]*/
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hadoop.mapreduce.Mapper;
 
 @SuppressWarnings("rawtypes")
 public abstract class BasicRPCCommunications<
@@ -128,9 +125,9 @@ public abstract class BasicRPCCommunicat
      * Map of vertex ranges to any incoming vertices from other workers.
      * (Synchronized)
      */
-    private final Map<I, List<Vertex<I, V, E, M>>>
+    private final Map<I, List<BasicVertex<I, V, E, M>>>
         inVertexRangeMap =
-            new TreeMap<I, List<Vertex<I, V, E, M>>>();
+            new TreeMap<I, List<BasicVertex<I, V, E, M>>>();
     /**
      * Map from vertex index to all vertex mutations
      */
@@ -567,11 +564,11 @@ end[HADOOP_FACEBOOK]*/
             }
             if (!inVertexRangeMap.containsKey(vertexIndexMax)) {
                 inVertexRangeMap.put(vertexIndexMax,
-                                     new ArrayList<Vertex<I, V, E, M>>());
+                                     new ArrayList<BasicVertex<I, V, E, M>>());
             }
-            List<Vertex<I, V, E, M>> tmpVertexList =
+            List<BasicVertex<I, V, E, M>> tmpVertexList =
                 inVertexRangeMap.get(vertexIndexMax);
-            for (Vertex<I, V, E, M> hadoopVertex : vertexList) {
+            for (BasicVertex<I, V, E, M> hadoopVertex : vertexList) {
                 tmpVertexList.add(hadoopVertex);
             }
         }
@@ -662,7 +659,7 @@ end[HADOOP_FACEBOOK]*/
                      addr + ", with vertex index " + vertexIndexMax +
                      ", list " + vertexList);
         }
-        if(peerConnections.get(addr).isProxy == false) {
+        if (peerConnections.get(addr).isProxy == false) {
             throw new RuntimeException("sendVertexList: Impossible to send " +
                 "to self for vertex index max " + vertexIndexMax);
         }
@@ -924,7 +921,7 @@ end[HADOOP_FACEBOOK]*/
                     conf, service.getGraphMapper().getGraphState());
             VertexRange<I, V, E, M> vertexRange =
                 service.getVertexRange(service.getSuperstep() - 1, vertexIndex);
-            Vertex<I, V, E, M> originalVertex =
+            BasicVertex<I, V, E, M> originalVertex =
                 vertexRange.getVertexMap().get(vertexIndex);
             List<M> msgList = inMessages.get(vertexIndex);
             if (originalVertex != null) {
@@ -996,7 +993,7 @@ end[HADOOP_FACEBOOK]*/
     }
 
     @Override
-    public Map<I, List<Vertex<I, V, E, M>>> getInVertexRangeMap() {
+    public Map<I, List<BasicVertex<I, V, E, M>>> getInVertexRangeMap() {
         return inVertexRangeMap;
     }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java Wed Nov  2 15:40:02 2011
@@ -18,8 +18,8 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -38,7 +38,7 @@ public class VertexList<
         V extends Writable,
         E extends Writable,
         M extends Writable>
-        extends ArrayListWritable<Vertex<I, V, E, M>> {
+        extends ArrayListWritable<BasicVertex<I, V, E, M>> {
     /** Defining a layout version for a serializable class. */
     private static final long serialVersionUID = 1000L;
 
@@ -50,7 +50,7 @@ public class VertexList<
     @SuppressWarnings("unchecked")
     @Override
     public void setClass() {
-        setClass((Class<Vertex<I, V, E, M>>)
+        setClass((Class<BasicVertex<I, V, E, M>>)
                  BspUtils.<I, V, E, M>getVertexClass(getConf()));
     }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java Wed Nov  2 15:40:02 2011
@@ -18,17 +18,16 @@
 
 package org.apache.giraph.comm;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
+import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.MutableVertex;
-import org.apache.giraph.graph.BasicVertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Public interface for workers to do message communication
  *
@@ -108,5 +107,5 @@ public interface WorkerCommunications<I 
      *
      * @return map of vertex ranges to vertices
      */
-    Map<I, List<Vertex<I, V, E, M>>> getInVertexRangeMap();
+    Map<I, List<BasicVertex<I, V, E, M>>> getInVertexRangeMap();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java Wed Nov  2 15:40:02 2011
@@ -18,17 +18,16 @@
 
 package org.apache.giraph.examples;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 
-import org.apache.giraph.bsp.BspInputSplit;
-import org.apache.giraph.graph.VertexInputFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * This VertexInputFormat is meant for testing/debugging.  It simply generates
@@ -36,8 +35,9 @@ import org.apache.giraph.graph.VertexInp
  */
 @SuppressWarnings("rawtypes")
 public abstract class GeneratedVertexInputFormat<
-        I extends WritableComparable, V extends Writable, E extends Writable>
-        extends VertexInputFormat<I, V, E> {
+        I extends WritableComparable, V extends Writable, E extends Writable,
+        M extends Writable>
+        extends VertexInputFormat<I, V, E, M> {
 
     @Override
     public List<InputSplit> getSplits(JobContext context, int numWorkers)

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java Wed Nov  2 15:40:02 2011
@@ -18,16 +18,15 @@
 
 package org.apache.giraph.examples;
 
-import java.io.IOException;
-
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.graph.VertexReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import org.apache.giraph.bsp.BspInputSplit;
-import org.apache.giraph.graph.VertexReader;
+import java.io.IOException;
 
 /**
  * Used by GeneratedVertexInputFormat to read some generated data
@@ -38,8 +37,9 @@ import org.apache.giraph.graph.VertexRea
  */
 @SuppressWarnings("rawtypes")
 public abstract class GeneratedVertexReader<
-        I extends WritableComparable, V extends Writable, E extends Writable>
-        implements VertexReader<I, V, E> {
+        I extends WritableComparable, V extends Writable, E extends Writable,
+        M extends Writable>
+        implements VertexReader<I, V, E, M> {
     /** Records read so far */
     protected long recordsRead = 0;
     /** Total records to read (on this split alone) */
@@ -47,19 +47,24 @@ public abstract class GeneratedVertexRea
     /** The input split from initialize(). */
     protected BspInputSplit inputSplit = null;
 
+    protected Configuration configuration = null;
+
     public static final String READER_VERTICES =
         "TestVertexReader.reader_vertices";
     public static final long DEFAULT_READER_VERTICES = 10;
 
+    public GeneratedVertexReader() {
+    }
+
     @Override
     final public void initialize(InputSplit inputSplit,
                                  TaskAttemptContext context)
             throws IOException {
-        Configuration configuration = context.getConfiguration();
-            totalRecords = configuration.getLong(
+        configuration = context.getConfiguration();
+        totalRecords = configuration.getLong(
                 GeneratedVertexReader.READER_VERTICES,
                 GeneratedVertexReader.DEFAULT_READER_VERTICES);
-            this.inputSplit = (BspInputSplit) inputSplit;
+        this.inputSplit = (BspInputSplit) inputSplit;
     }
 
     @Override

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Wed Nov  2 15:40:02 2011
@@ -18,7 +18,12 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.graph.*;
+import com.google.common.collect.Maps;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.graph.VertexWriter;
 import org.apache.giraph.lib.TextVertexOutputFormat;
 import org.apache.giraph.lib.TextVertexOutputFormat.TextVertexWriter;
 import org.apache.hadoop.io.DoubleWritable;
@@ -32,12 +37,12 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Demonstrates the basic Pregel PageRank implementation.
  */
-public class SimplePageRankVertex extends
-        Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
     /** User can access this sum after the application finishes if local */
     public static long finalSum;
     /** User can access this min after the application finishes if local */
@@ -128,34 +133,44 @@ public class SimplePageRankVertex extend
      * Simple VertexReader that supports {@link SimplePageRankVertex}
      */
     public static class SimplePageRankVertexReader extends
-            GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> {
+            GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable,
+                DoubleWritable> {
         /** Class logger */
         private static final Logger LOG =
             Logger.getLogger(SimplePageRankVertexReader.class);
+
+        public SimplePageRankVertexReader() {
+            super();
+        }
+
         @Override
-        public boolean next(MutableVertex<LongWritable, DoubleWritable,
-                            FloatWritable, ?> vertex) throws IOException {
-            if (totalRecords <= recordsRead) {
-                return false;
-            }
-            vertex.setVertexId(new LongWritable(
-                (inputSplit.getSplitIndex() * totalRecords) + recordsRead));
-            vertex.setVertexValue(
-                new DoubleWritable(vertex.getVertexId().get() * 10d));
+        public boolean nextVertex() {
+            return totalRecords > recordsRead;
+        }
+
+        @Override
+        public BasicVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+          getCurrentVertex() throws IOException {
+            BasicVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+                vertex = BspUtils.createVertex(configuration);
+
+            LongWritable vertexId = new LongWritable(
+                (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+            DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
             long destVertexId =
-                (vertex.getVertexId().get() + 1) %
+                (vertexId.get() + 1) %
                 (inputSplit.getNumSplits() * totalRecords);
-            float edgeValue = vertex.getVertexId().get() * 100f;
-            // Adds an edge to the neighbor vertex
-            vertex.addEdge(new LongWritable(destVertexId),
-                    new FloatWritable(edgeValue));
+            float edgeValue = vertexId.get() * 100f;
+            Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
+            edges.put(new LongWritable(destVertexId), new FloatWritable(edgeValue));
+            vertex.initialize(vertexId, vertexValue, edges, null);
             ++recordsRead;
             if (LOG.isInfoEnabled()) {
-	            LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
-	                ", vertexValue=" + vertex.getVertexValue() +
-	                ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue);
+	        LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
+	                 ", vertexValue=" + vertex.getVertexValue() +
+	                 ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue);
             }
-            return true;
+            return vertex;
         }
     }
 
@@ -164,9 +179,9 @@ public class SimplePageRankVertex extend
      */
     public static class SimplePageRankVertexInputFormat extends
             GeneratedVertexInputFormat<LongWritable,
-            DoubleWritable, FloatWritable> {
+            DoubleWritable, FloatWritable, DoubleWritable> {
         @Override
-        public VertexReader<LongWritable, DoubleWritable, FloatWritable>
+        public VertexReader<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
                 createVertexReader(InputSplit split,
                                    TaskAttemptContext context)
                                    throws IOException {

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java Wed Nov  2 15:40:02 2011
@@ -18,7 +18,13 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.graph.*;
+import com.google.common.collect.Maps;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.graph.VertexWriter;
 import org.apache.giraph.lib.TextVertexInputFormat;
 import org.apache.giraph.lib.TextVertexInputFormat.TextVertexReader;
 import org.apache.giraph.lib.TextVertexOutputFormat;
@@ -43,6 +49,7 @@ import org.json.JSONException;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Demonstrates the basic Pregel shortest paths implementation.
@@ -104,9 +111,12 @@ public class SimpleShortestPathsVertex e
      * VertexInputFormat that supports {@link SimpleShortestPathsVertex}
      */
     public static class SimpleShortestPathsVertexInputFormat extends
-            TextVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
+            TextVertexInputFormat<LongWritable,
+                                  DoubleWritable,
+                                  FloatWritable,
+                                  DoubleWritable> {
         @Override
-        public VertexReader<LongWritable, DoubleWritable, FloatWritable>
+        public VertexReader<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
                 createVertexReader(InputSplit split,
                                    TaskAttemptContext context)
                                    throws IOException {
@@ -127,7 +137,8 @@ public class SimpleShortestPathsVertex e
      * [1,4.3,[[2,2.1],[3,0.7]]]
      */
     public static class SimpleShortestPathsVertexReader extends
-            TextVertexReader<LongWritable, DoubleWritable, FloatWritable> {
+            TextVertexReader<LongWritable,
+                DoubleWritable, FloatWritable, DoubleWritable> {
 
         public SimpleShortestPathsVertexReader(
                 RecordReader<LongWritable, Text> lineRecordReader) {
@@ -135,31 +146,36 @@ public class SimpleShortestPathsVertex e
         }
 
         @Override
-        public boolean next(MutableVertex<LongWritable,
-                            DoubleWritable, FloatWritable, ?> vertex)
-                throws IOException, InterruptedException {
-            if (!getRecordReader().nextKeyValue()) {
-                return false;
-            }
+        public BasicVertex<LongWritable, DoubleWritable, FloatWritable,
+                           DoubleWritable> getCurrentVertex()
+            throws IOException, InterruptedException {
+          BasicVertex<LongWritable, DoubleWritable, FloatWritable,
+              DoubleWritable> vertex = BspUtils.<LongWritable, DoubleWritable, FloatWritable,
+                  DoubleWritable>createVertex(getContext().getConfiguration());
 
             Text line = getRecordReader().getCurrentValue();
             try {
                 JSONArray jsonVertex = new JSONArray(line.toString());
-                vertex.setVertexId(
-                    new LongWritable(jsonVertex.getLong(0)));
-                vertex.setVertexValue(
-                    new DoubleWritable(jsonVertex.getDouble(1)));
+                LongWritable vertexId = new LongWritable(jsonVertex.getLong(0));
+                DoubleWritable vertexValue = new DoubleWritable(jsonVertex.getDouble(1));
+                Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
                 JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
                 for (int i = 0; i < jsonEdgeArray.length(); ++i) {
                     JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
-                    vertex.addEdge(new LongWritable(jsonEdge.getLong(0)),
+                    edges.put(new LongWritable(jsonEdge.getLong(0)),
                             new FloatWritable((float) jsonEdge.getDouble(1)));
                 }
+                vertex.initialize(vertexId, vertexValue, edges, null);
             } catch (JSONException e) {
                 throw new IllegalArgumentException(
                     "next: Couldn't get vertex from line " + line, e);
             }
-            return true;
+            return vertex;
+        }
+
+        @Override
+        public boolean nextVertex() throws IOException, InterruptedException {
+            return getRecordReader().nextKeyValue();
         }
     }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Wed Nov  2 15:40:02 2011
@@ -18,8 +18,9 @@
 
 package org.apache.giraph.examples;
 
+import com.google.common.collect.Maps;
 import org.apache.giraph.graph.BasicVertex;
-import org.apache.giraph.graph.MutableVertex;
+import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.graph.VertexWriter;
@@ -36,6 +37,7 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Just a simple Vertex compute implementation that executes 3 supersteps, then
@@ -54,35 +56,41 @@ public class SimpleSuperstepVertex exten
      * Simple VertexReader that supports {@link SimpleSuperstepVertex}
      */
     public static class SimpleSuperstepVertexReader extends
-            GeneratedVertexReader<LongWritable, IntWritable, FloatWritable> {
+            GeneratedVertexReader<LongWritable, IntWritable, FloatWritable, IntWritable> {
         /** Class logger */
         private static final Logger LOG =
             Logger.getLogger(SimpleSuperstepVertexReader.class);
         @Override
-        public boolean next(MutableVertex<LongWritable, IntWritable,
-                            FloatWritable, ?> vertex) throws IOException {
-            if (totalRecords <= recordsRead) {
-                return false;
-            }
-            vertex.setVertexId(new LongWritable(
-                (inputSplit.getSplitIndex() * totalRecords) + recordsRead));
-            vertex.setVertexValue(
-                new IntWritable((int) (vertex.getVertexId().get() * 10)));
-            long destVertexId =
-                (vertex.getVertexId().get() + 1) %
-                (inputSplit.getNumSplits() * totalRecords);
-            float edgeValue = vertex.getVertexId().get() * 100f;
-            // Adds an edge to the neighbor vertex
-            vertex.addEdge(new LongWritable(destVertexId),
-                    new FloatWritable(edgeValue));
+        public boolean nextVertex() throws IOException, InterruptedException {
+            return totalRecords > recordsRead;
+        }
+
+        public SimpleSuperstepVertexReader() {
+            super();
+        }
+
+        @Override
+        public BasicVertex<LongWritable, IntWritable, FloatWritable, IntWritable> getCurrentVertex()
+            throws IOException, InterruptedException {
+            BasicVertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
+                BspUtils.<LongWritable, IntWritable, FloatWritable, IntWritable>createVertex(
+                    configuration);
+            LongWritable vertexId = new LongWritable(
+                (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+            IntWritable vertexValue = new IntWritable((int) (vertexId.get() * 10));
+            Map<LongWritable, FloatWritable> edgeMap = Maps.newHashMap();
+            long destVertexId = (vertexId.get() + 1) % (inputSplit.getNumSplits() * totalRecords);
+            float edgeValue = vertexId.get() * 100f;
+            edgeMap.put(new LongWritable(destVertexId), new FloatWritable(edgeValue));
+            vertex.initialize(vertexId, vertexValue, edgeMap, null);
             ++recordsRead;
             if (LOG.isInfoEnabled()) {
-	            LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
-	                ", vertexValue=" + vertex.getVertexValue() +
-	                ", destinationId=" + destVertexId +
-	                ", edgeValue=" + edgeValue);
+                LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
+                         ", vertexValue=" + vertex.getVertexValue() +
+                         ", destinationId=" + destVertexId +
+                         ", edgeValue=" + edgeValue);
             }
-            return true;
+            return vertex;
         }
     }
 
@@ -91,9 +99,9 @@ public class SimpleSuperstepVertex exten
      */
     public static class SimpleSuperstepVertexInputFormat extends
             GeneratedVertexInputFormat<LongWritable,
-            IntWritable, FloatWritable> {
+            IntWritable, FloatWritable, IntWritable> {
         @Override
-        public VertexReader<LongWritable, IntWritable, FloatWritable>
+        public VertexReader<LongWritable, IntWritable, FloatWritable, IntWritable>
                 createVertexReader(InputSplit split,
                                    TaskAttemptContext context)
                                    throws IOException {

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Wed Nov  2 15:40:02 2011
@@ -27,8 +27,9 @@ import org.apache.hadoop.mapreduce.Mappe
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
-/**
+ /**
  * Basic interface for writing a BSP application for computation.
  *
  * @param <I> vertex id
@@ -39,12 +40,14 @@ import java.util.List;
 @SuppressWarnings("rawtypes")
 public abstract class BasicVertex<I extends WritableComparable,
         V extends Writable, E extends Writable, M extends Writable>
-        implements AggregatorUsage, Iterable<I>, Configurable {
+        implements AggregatorUsage, Iterable<I>, Writable, Configurable {
     /** Global graph state **/
     private GraphState<I,V,E,M> graphState;
     /** Configuration */
     private Configuration conf;
 
+    public abstract void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages);
+
     /**
      * Optionally defined by the user to be executed once on all workers
      * before application has started.

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java Wed Nov  2 15:40:02 2011
@@ -18,11 +18,11 @@
 
 package org.apache.giraph.graph;
 
-import java.util.List;
-
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.util.List;
+
 /**
  * Handles all the situations that can arise upon creation/removal of
  * vertices and edges.
@@ -55,5 +55,5 @@ public interface BasicVertexResolver<
      *
      * @return Newly instantiated vertex.
      */
-    MutableVertex<I, V, E, M> instantiateVertex();
+    BasicVertex<I, V, E, M> instantiateVertex();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Wed Nov  2 15:40:02 2011
@@ -124,7 +124,7 @@ public abstract class BspService <
     /** File system */
     private final FileSystem fs;
     /** Used to call pre/post application/superstep methods */
-    private final Vertex<I, V, E, M> representativeVertex;
+    private final BasicVertex<I, V, E, M> representativeVertex;
     /** Checkpoint frequency */
     private int checkpointFrequency = -1;
     /** Vertex range map based on the superstep below */
@@ -630,9 +630,8 @@ public abstract class BspService <
 
         this.representativeVertex =
             BspUtils.<I, V, E, M>createVertex(
-                getConfiguration(),
-                getGraphMapper().getGraphState());
-
+                getConfiguration());
+        this.representativeVertex.setGraphState(getGraphMapper().getGraphState());
         this.checkpointFrequency =
             conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
                           GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT);
@@ -675,7 +674,7 @@ public abstract class BspService <
      *
      * @return Representative vertex for this service.
      */
-    final public Vertex<I, V, E, M> getRepresentativeVertex() {
+    final public BasicVertex<I, V, E, M> getRepresentativeVertex() {
         return representativeVertex;
     }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Nov  2 15:40:02 2011
@@ -18,6 +18,37 @@
 
 package org.apache.giraph.graph;
 
+import net.iharder.Base64;
+import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.BspInputFormat;
+import org.apache.giraph.bsp.CentralizedService;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
+import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.graph.GraphMapper.MapFunctions;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -38,42 +69,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
-import net.iharder.Base64;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.log4j.Logger;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.ZooDefs.Ids;
-
-import org.apache.giraph.bsp.BspInputFormat;
-import org.apache.giraph.bsp.ApplicationState;
-import org.apache.giraph.bsp.CentralizedService;
-import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.graph.GraphMapper.MapFunctions;
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-
 /**
  * Zookeeper-based implementation of {@link CentralizedService}.
  */
@@ -204,8 +199,8 @@ public class BspServiceMaster<
      * @throws InterruptedException
      */
     private List<InputSplit> generateInputSplits(int numWorkers) {
-        VertexInputFormat<I, V, E> vertexInputFormat =
-            BspUtils.<I, V, E>createVertexInputFormat(getConfiguration());
+        VertexInputFormat<I, V, E, M> vertexInputFormat =
+            BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
         List<InputSplit> splits;
         try {
             splits = vertexInputFormat.getSplits(getContext(), numWorkers);
@@ -1786,7 +1781,7 @@ public class BspServiceMaster<
             }
         }
         incrCachedSuperstep();
-        if(getSuperstep() > 0) {  // counter starts at zero, so no need to incr
+        if (getSuperstep() > 0) {  // counter starts at zero, so no need to incr
             superstepCounter.increment(1);
         }
         try {

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Nov  2 15:40:02 2011
@@ -313,7 +313,7 @@ public class BspServiceWorker<
                InstantiationException, IllegalAccessException {
         InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
 
-        List<Vertex<I, V, E, M>> vertexList =
+        List<BasicVertex<I, V, E, M>> vertexList =
             readVerticesFromInputSplit(inputSplit);
 
         if (LOG.isInfoEnabled()) {
@@ -381,19 +381,17 @@ public class BspServiceWorker<
      * @throws IOException
      * @throws InterruptedException
      */
-    private List<Vertex<I, V, E, M>> readVerticesFromInputSplit(
+    private List<BasicVertex<I, V, E, M>> readVerticesFromInputSplit(
             InputSplit inputSplit) throws IOException, InterruptedException {
-        List<Vertex<I, V, E, M>> vertexList =
-            new ArrayList<Vertex<I, V, E, M>>();
-        VertexInputFormat<I, V, E> vertexInputFormat =
-            BspUtils.<I, V, E>createVertexInputFormat(getConfiguration());
-        VertexReader<I, V, E> vertexReader =
+        List<BasicVertex<I, V, E, M>> vertexList =
+            new ArrayList<BasicVertex<I, V, E, M>>();
+        VertexInputFormat<I, V, E, M> vertexInputFormat =
+            BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
+        VertexReader<I, V, E, M> vertexReader =
             vertexInputFormat.createVertexReader(inputSplit, getContext());
         vertexReader.initialize(inputSplit, getContext());
-        Vertex<I, V, E, M> readerVertex =
-            BspUtils.<I, V, E, M>createVertex(
-                getConfiguration(), getGraphMapper().getGraphState());
-        while (vertexReader.next(readerVertex)) {
+        while (vertexReader.nextVertex()) {
+            BasicVertex<I, V, E, M> readerVertex = vertexReader.getCurrentVertex();
             if (readerVertex.getVertexId() == null) {
                 throw new IllegalArgumentException(
                     "loadVertices: Vertex reader returned a vertex " +
@@ -418,8 +416,6 @@ public class BspServiceWorker<
                 }
             }
             vertexList.add(readerVertex);
-            readerVertex = BspUtils.<I, V, E, M>createVertex(getConfiguration(),
-                getGraphMapper().getGraphState());
             getContext().progress();
         }
         vertexReader.close();
@@ -440,7 +436,7 @@ public class BspServiceWorker<
      * @throws IOException
      */
     private NavigableMap<I, VertexRange<I, V, E, M>> getVertexRanges(
-        InputSplit inputSplit, List<Vertex<I, V, E, M>> vertexList)
+        InputSplit inputSplit, List<BasicVertex<I, V, E, M>> vertexList)
         throws InstantiationException, IllegalAccessException, IOException {
 
         NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
@@ -488,7 +484,7 @@ public class BspServiceWorker<
         // Now iterate over the defined ranges, placing each vertex in its range
         Iterator<I> maxIndexVertexMapIt = vertexRangeMap.keySet().iterator();
         I currentVertexIndexMax = maxIndexVertexMapIt.next();
-        for (Vertex<I, V, E, M> vertex : vertexList) {
+        for (BasicVertex<I, V, E, M> vertex : vertexList) {
             @SuppressWarnings("unchecked")
             int compareTo = vertex.getVertexId().compareTo(currentVertexIndexMax);
             if (compareTo > 0) {
@@ -507,7 +503,7 @@ public class BspServiceWorker<
             }
             VertexRange<I, V, E, M> range =
                 vertexRangeMap.get(currentVertexIndexMax);
-            SortedMap<I, Vertex<I, V, E, M>> vertexMap = range.getVertexMap();
+            SortedMap<I, BasicVertex<I, V, E, M>> vertexMap = range.getVertexMap();
             if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
                 throw new IllegalStateException(
                     "loadVertices: Already contains vertex " +
@@ -1105,7 +1101,7 @@ public class BspServiceWorker<
                     new ByteArrayOutputStream();
                 DataOutput vertexOutput =
                     new DataOutputStream(vertexByteStream);
-                ((MutableVertex<I, V, E, M>) vertex).write(vertexOutput);
+                vertex.write(vertexOutput);
                 verticesOutputStream.write(vertexByteStream.toByteArray());
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("storeCheckpoint: Wrote vertex id = " +
@@ -1176,10 +1172,8 @@ public class BspServiceWorker<
         long vertexCount = dataStream.readLong();
         VertexRange<I, V, E, M> vertexRange = getVertexRangeMap().get(maxIndex);
         for (int i = 0; i < vertexCount; ++i) {
-            Vertex<I, V, E, M> vertex =
-                BspUtils.<I, V, E, M>createVertex(
-                    getConfiguration(),
-                    getGraphMapper().getGraphState());
+            BasicVertex<I, V, E, M> vertex = BspUtils.<I, V, E, M>createVertex(getConfiguration());
+            vertex.setGraphState(getGraphMapper().getGraphState());
             vertex.readFields(dataStream);
             // Add the vertex
             if (vertexRange.getVertexMap().put(vertex.getVertexId(), vertex)
@@ -1205,8 +1199,8 @@ public class BspServiceWorker<
         long vertexRangeCount = -1;
         for (VertexRange<I, V, E, M> vertexRange :
                 getVertexRangeMap().values()) {
-            if (vertexRange.getHostnameId().compareTo(
-                    getHostnamePartitionId()) == 0) {
+            if (vertexRange.getHostnameId()
+                    .compareTo(getHostnamePartitionId()) == 0) {
                 String metadataFile =
                     vertexRange.getCheckpointFilePrefix() +
                     CHECKPOINT_METADATA_POSTFIX;
@@ -1357,16 +1351,16 @@ public class BspServiceWorker<
         }
 
         // Add the vertices that were sent earlier.
-        Map<I, List<Vertex<I, V, E, M>>> inVertexRangeMap =
+        Map<I, List<BasicVertex<I, V, E, M>>> inVertexRangeMap =
             getGraphMapper().getWorkerCommunications().getInVertexRangeMap();
         synchronized (inVertexRangeMap) {
-            for (Entry<I, List<Vertex<I, V, E, M>>> entry :
+            for (Entry<I, List<BasicVertex<I, V, E, M>>> entry :
                     inVertexRangeMap.entrySet()) {
                 if (entry.getValue() == null || entry.getValue().isEmpty()) {
                     continue;
                 }
 
-                SortedMap<I, Vertex<I, V, E, M>> vertexMap =
+                SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
                     getVertexRangeMap().get(entry.getKey()).getVertexMap();
                 if (vertexMap.size() != 0) {
                     throw new RuntimeException(
@@ -1380,7 +1374,7 @@ public class BspServiceWorker<
                              entry.getValue().size() +
                              " vertices for max index " + entry.getKey());
                 }
-                for (Vertex<I, V, E, M> vertex : entry.getValue()) {
+                for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
                     if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
                         throw new IllegalStateException(
                             "exchangeVertexRanges: Vertex " + vertex +

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Wed Nov  2 15:40:02 2011
@@ -37,10 +37,11 @@ public class BspUtils {
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public static <I extends WritableComparable,
                    V extends Writable,
-                   E extends Writable>
-            Class<? extends VertexInputFormat<I, V, E>>
+                   E extends Writable,
+                   M extends Writable>
+            Class<? extends VertexInputFormat<I, V, E, M>>
             getVertexInputFormatClass(Configuration conf) {
-        return (Class<? extends VertexInputFormat<I, V, E>>)
+        return (Class<? extends VertexInputFormat<I, V, E, M>>)
                 conf.getClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS,
                               null,
                               VertexInputFormat.class);
@@ -53,12 +54,16 @@ public class BspUtils {
      * @return Instantiated user vertex input format class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable,
-            E extends Writable> VertexInputFormat<I, V, E>
+    public static <I extends WritableComparable,
+                   V extends Writable,
+                   E extends Writable,
+                   M extends Writable> VertexInputFormat<I, V, E, M>
             createVertexInputFormat(Configuration conf) {
-        Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass =
+        Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
             getVertexInputFormatClass(conf);
-        return ReflectionUtils.newInstance(vertexInputFormatClass, conf);
+        VertexInputFormat<I, V, E, M> inputFormat =
+            ReflectionUtils.newInstance(vertexInputFormatClass, conf);
+        return inputFormat;
     }
 
     /**
@@ -209,12 +214,12 @@ public class BspUtils {
                    V extends Writable,
                    E extends Writable,
                    M extends Writable>
-            Class<? extends Vertex<I, V, E, M>>
+            Class<? extends BasicVertex<I, V, E, M>>
             getVertexClass(Configuration conf) {
-        return (Class<? extends Vertex<I, V, E, M>>)
+        return (Class<? extends BasicVertex<I, V, E, M>>)
                 conf.getClass(GiraphJob.VERTEX_CLASS,
                               null,
-                              Vertex.class);
+                              BasicVertex.class);
     }
 
     /**
@@ -225,14 +230,10 @@ public class BspUtils {
      */
     @SuppressWarnings("rawtypes")
     public static <I extends WritableComparable, V extends Writable,
-            E extends Writable, M extends Writable> Vertex<I, V, E, M>
-            createVertex(Configuration conf,
-            GraphState<I, V, E, M> graphState) {
-        Class<? extends Vertex<I, V, E, M>> vertexClass =
-            getVertexClass(conf);
-        Vertex<I, V, E, M> vertex =
-            ReflectionUtils.newInstance(vertexClass, conf);
-        vertex.setGraphState(graphState);
+            E extends Writable, M extends Writable> BasicVertex<I, V, E, M>
+            createVertex(Configuration conf) {
+        Class<? extends BasicVertex<I, V, E, M>> vertexClass = getVertexClass(conf);
+        BasicVertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
         return vertex;
     }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java Wed Nov  2 15:40:02 2011
@@ -18,15 +18,15 @@
 
 package org.apache.giraph.graph;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 /**
  * A complete edge, the destination vertex and the edge value.  Can only be one
  * edge with a destination vertex id per edge map.
@@ -36,7 +36,7 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public class Edge<I extends WritableComparable, E extends Writable>
-        implements Writable, Configurable {
+        implements WritableComparable<Edge<I, E>>, Configurable {
     /** Destination vertex id */
     private I destVertexId = null;
     /** Edge value */
@@ -84,7 +84,7 @@ public class Edge<I extends WritableComp
      * @param destVertexId new destination vertex
      */
     public void setDestVertexId(I destVertexId) {
-       this.destVertexId = destVertexId;
+        this.destVertexId = destVertexId;
     }
 
     /**
@@ -93,7 +93,7 @@ public class Edge<I extends WritableComp
      * @param edgeValue new edge value
      */
     public void setEdgeValue(E edgeValue) {
-       this.edgeValue = edgeValue;
+        this.edgeValue = edgeValue;
     }
 
     @Override
@@ -134,4 +134,35 @@ public class Edge<I extends WritableComp
     public void setConf(Configuration conf) {
         this.conf = conf;
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int compareTo(Edge<I, E> edge) {
+        return destVertexId.compareTo(edge.getDestVertexId());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) { return true; }
+        if (o == null || getClass() != o.getClass()) { return false; }
+
+        Edge edge = (Edge) o;
+
+        if (destVertexId != null ? !destVertexId.equals(edge.destVertexId) :
+            edge.destVertexId != null) {
+            return false;
+        }
+        if (edgeValue != null ? !edgeValue.equals(edge.edgeValue) : edge.edgeValue != null) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = destVertexId != null ? destVertexId.hashCode() : 0;
+        result = 31 * result + (edgeValue != null ? edgeValue.hashCode() : 0);
+        return result;
+    }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Wed Nov  2 15:40:02 2011
@@ -18,13 +18,14 @@
 
 package org.apache.giraph.graph;
 
-import java.io.IOException;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.BspOutputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
+
 /**
  * Limits the functions that can be called by the user.  Job is too flexible
  * for our needs.  For instance, our job should not have any reduce tasks.
@@ -330,7 +331,7 @@ public class GiraphJob extends Job {
      * @param vertexClass Runs vertex computation
      */
     final public void setVertexClass(Class<?> vertexClass) {
-        getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
+        getConfiguration().setClass(VERTEX_CLASS, vertexClass, BasicVertex.class);
     }
 
     /**

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Wed Nov  2 15:40:02 2011
@@ -166,17 +166,17 @@ public class GraphMapper<I extends Writa
      * @param conf Configuration to get the various classes
      */
     public void determineClassTypes(Configuration conf) {
-        Class<? extends Vertex<I, V, E, M>> vertexClass =
+        Class<? extends BasicVertex<I, V, E, M>> vertexClass =
             BspUtils.<I, V, E, M>getVertexClass(conf);
-        List<Class<?>> classList = ReflectionUtils.<Vertex>getTypeArguments(
-            Vertex.class, vertexClass);
+        List<Class<?>> classList = ReflectionUtils.<BasicVertex>getTypeArguments(
+            BasicVertex.class, vertexClass);
         Type vertexIndexType = classList.get(0);
         Type vertexValueType = classList.get(1);
         Type edgeValueType = classList.get(2);
         Type messageValueType = classList.get(3);
 
-        Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass =
-            BspUtils.<I, V, E>getVertexInputFormatClass(conf);
+        Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
+            BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
         classList = ReflectionUtils.<VertexInputFormat>getTypeArguments(
             VertexInputFormat.class, vertexInputFormatClass);
         if (classList.get(0) == null) {
@@ -514,10 +514,9 @@ public class GraphMapper<I extends Writa
         }
         mapAlreadyRun = true;
 
-        graphState.setSuperstep(serviceWorker.getSuperstep()).
-            setContext(context).setGraphMapper(this).
-            setNumEdges(serviceWorker.getTotalEdges()).
-            setNumVertices(serviceWorker.getTotalVertices());
+        graphState.setSuperstep(serviceWorker.getSuperstep()).setContext(context)
+                  .setGraphMapper(this).setNumEdges(serviceWorker.getTotalEdges())
+                  .setNumVertices(serviceWorker.getTotalVertices());
 
         try {
             serviceWorker.getRepresentativeVertex().setGraphState(graphState);
@@ -606,13 +605,14 @@ public class GraphMapper<I extends Writa
                     continue;
                 }
 
-                for (Vertex<I, V, E, M> vertex :
+                for (BasicVertex<I, V, E, M> vertex :
                         entry.getValue().getVertexMap().values()) {
                     // Make sure every vertex has the current
                     // graphState before computing
+
                     vertex.setGraphState(graphState);
-                    if (vertex.isHalted() &&
-                            !vertex.getMsgList().isEmpty()) {
+                    if (vertex.isHalted() && !vertex.getMsgList().isEmpty()) {
+                      // TODO FIXME: if this is not a subclass of Vertex, this will blow up!
                         Vertex<I, V, E, M> activatedVertex =
                             (Vertex<I, V, E, M>) vertex;
                         activatedVertex.halt = false;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Wed Nov  2 15:40:02 2011
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.graph;
 
 import org.apache.hadoop.io.Writable;

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1196639&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Wed Nov  2 15:40:02 2011
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+import org.apache.mahout.math.list.DoubleArrayList;
+import org.apache.mahout.math.map.OpenLongFloatHashMap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public abstract class LongDoubleFloatDoubleVertex extends
+        MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+    /** Class logger */
+    private static final Logger LOG = Logger.getLogger(LongDoubleFloatDoubleVertex.class);
+
+    private long vertexId;
+    private double vertexValue;
+    private OpenLongFloatHashMap verticesWithEdgeValues = new OpenLongFloatHashMap();
+    private DoubleArrayList messageList = new DoubleArrayList();
+    /** If true, do not do anymore computation on this vertex. */
+    boolean halt = false;
+
+    @Override
+    public void initialize(LongWritable vertexIdW, DoubleWritable vertexValueW,
+        Map<LongWritable, FloatWritable> edgesW, List<DoubleWritable> messagesW) {
+      if (vertexIdW != null ) {
+        vertexId = vertexIdW.get();
+      }
+      if (vertexValueW != null) {
+        vertexValue = vertexValueW.get();
+      }
+      if (edgesW != null) {
+        for(Map.Entry<LongWritable, FloatWritable> entry : edgesW.entrySet()) {
+         verticesWithEdgeValues.put(entry.getKey().get(), entry.getValue().get());
+        }
+      }
+      if (messagesW != null) {
+        for(DoubleWritable m : messagesW) {
+          messageList.add(m.get());
+        }
+      }
+    }
+
+    @Override
+    public void preApplication()
+            throws InstantiationException, IllegalAccessException {
+        // Do nothing, might be overriden by the user
+    }
+
+    @Override
+    public void postApplication() {
+        // Do nothing, might be overriden by the user
+    }
+
+    @Override
+    public void preSuperstep() {
+        // Do nothing, might be overriden by the user
+    }
+
+    @Override
+    public void postSuperstep() {
+        // Do nothing, might be overriden by the user
+    }
+
+
+    @Override
+    public final boolean addEdge(LongWritable targetId, FloatWritable edgeValue) {
+        if (verticesWithEdgeValues.put(targetId.get(), edgeValue.get())) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("addEdge: Vertex=" + vertexId +
+                        ": already added an edge value for dest vertex id " +
+                        targetId.get());
+            }
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    @Override
+    public FloatWritable removeEdge(LongWritable targetVertexId) {
+        long target = targetVertexId.get();
+        if (verticesWithEdgeValues.containsKey(target)) {
+            float value = verticesWithEdgeValues.get(target);
+            verticesWithEdgeValues.removeKey(target);
+            return new FloatWritable(value);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public final void setVertexId(LongWritable vertexId) {
+        this.vertexId = vertexId.get();
+    }
+
+    @Override
+    public final LongWritable getVertexId() {
+        return new LongWritable(vertexId); // TODO: possibly not make new objects every time?
+    }
+
+    @Override
+    public final DoubleWritable getVertexValue() {
+        return new DoubleWritable(vertexValue);
+    }
+
+    @Override
+    public final void setVertexValue(DoubleWritable vertexValue) {
+        this.vertexValue = vertexValue.get();
+    }
+
+    @Override
+    public final void sendMsg(LongWritable id, DoubleWritable msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException(
+                    "sendMsg: Cannot send null message to " + id);
+        }
+        getGraphState().getGraphMapper().getWorkerCommunications().sendMessageReq(id, msg);
+    }
+
+    @Override
+    public final void sendMsgToAllEdges(DoubleWritable msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException(
+                    "sendMsgToAllEdges: Cannot send null message to all edges");
+        }
+        LongWritable destVertex = new LongWritable();
+        for (long destVertexId : verticesWithEdgeValues.keys().elements()) {
+            destVertex.set(destVertexId);
+            sendMsg(destVertex, msg);
+        }
+    }
+ //   @Override
+  //  public MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> instantiateVertex() {
+  //      LongDoubleFloatDoubleVertex vertex = (LongDoubleFloatDoubleVertex)
+  //          BspUtils.<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>createVertex(
+  //              getGraphState().getContext().getConfiguration());
+  //      return vertex;
+  //  }
+
+    @Override
+    public long getNumVertices() {
+      System.out.println("in getNumVertices!");
+        return getGraphState().getNumVertices();
+    }
+
+    @Override
+    public long getNumEdges() {
+        return getGraphState().getNumEdges();
+    }
+
+    @Override
+    public Iterator<LongWritable> iterator() {
+      final long[] destVertices = verticesWithEdgeValues.keys().elements();
+      final LongWritable lw = new LongWritable();
+      return new Iterator<LongWritable>() {
+        int offset = 0;
+        @Override public boolean hasNext() {
+          return offset < destVertices.length;
+        }
+
+        @Override public LongWritable next() {
+          lw.set(destVertices[offset++]);
+          return lw;
+        }
+
+        @Override public void remove() {
+          throw new UnsupportedOperationException("Mutation disallowed for edge list via iterator");
+        }
+      };
+    }
+
+    @Override
+    public FloatWritable getEdgeValue(LongWritable targetVertexId) {
+        return new FloatWritable(verticesWithEdgeValues.get(targetVertexId.get()));
+    }
+
+    @Override
+    public boolean hasEdge(LongWritable targetVertexId) {
+        return verticesWithEdgeValues.containsKey(targetVertexId.get());
+    }
+
+    @Override
+    public int getNumOutEdges() {
+        return verticesWithEdgeValues.size();
+    }
+
+    @Override
+    public long getSuperstep() {
+        return getGraphState().getSuperstep();
+    }
+
+    @Override
+    public void addVertexRequest(MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> vertex)
+            throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().addVertexReq(vertex);
+    }
+
+    @Override
+    public void removeVertexRequest(LongWritable vertexId) throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().removeVertexReq(vertexId);
+    }
+
+    @Override
+    public void addEdgeRequest(LongWritable vertexIndex,
+                               Edge<LongWritable, FloatWritable> edge) throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().addEdgeReq(vertexIndex, edge);
+    }
+
+    @Override
+    public void removeEdgeRequest(LongWritable sourceVertexId,
+                                  LongWritable destVertexId) throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().removeEdgeReq(sourceVertexId, destVertexId);
+    }
+
+    @Override
+    public final void voteToHalt() {
+        halt = true;
+    }
+
+    @Override
+    public final boolean isHalted() {
+        return halt;
+    }
+
+    @Override
+    final public void readFields(DataInput in) throws IOException {
+        vertexId = in.readLong();
+        vertexValue = in.readDouble();
+        long edgeMapSize = in.readLong();
+        for (long i = 0; i < edgeMapSize; ++i) {
+            long destVertexId = in.readLong();
+            float edgeValue = in.readFloat();
+            verticesWithEdgeValues.put(destVertexId, edgeValue);
+        }
+        long msgListSize = in.readLong();
+        for (long i = 0; i < msgListSize; ++i) {
+            messageList.add(in.readDouble());
+        }
+        halt = in.readBoolean();
+    }
+
+    @Override
+    public final void write(DataOutput out) throws IOException {
+        out.writeLong(vertexId);
+        out.writeDouble(vertexValue);
+        out.writeLong(verticesWithEdgeValues.size());
+        for(long destVertexId : verticesWithEdgeValues.keys().elements()) {
+            float edgeValue = verticesWithEdgeValues.get(destVertexId);
+            out.writeLong(destVertexId);
+            out.writeFloat(edgeValue);
+        }
+        out.writeLong(messageList.size());
+        for(double msg : messageList.elements()) {
+            out.writeDouble(msg);
+        }
+        out.writeBoolean(halt);
+    }
+
+    @Override
+    public List<DoubleWritable> getMsgList() {
+        final DoubleWritable message = new DoubleWritable();
+        return new AbstractList<DoubleWritable>() {
+            @Override public DoubleWritable get(int i) {
+                message.set(messageList.get(i));
+                return message;
+            }
+            @Override public int size() {
+                return messageList.size();
+            }
+            @Override public boolean add(DoubleWritable dw) {
+                messageList.add(dw.get());
+                return true;
+            }
+            @Override public boolean addAll(Collection<? extends DoubleWritable> collection) {
+                for(DoubleWritable dw : collection) {
+                    messageList.add(dw.get());
+                }
+                return true;
+            }
+            @Override public void clear() {
+                messageList.clear();
+            }
+            @Override public boolean isEmpty() {
+                return messageList.isEmpty();
+            }
+        };
+    }
+
+    @Override
+    public String toString() {
+        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
+                ",#edges=" + getNumOutEdges() + ")";
+    }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Wed Nov  2 15:40:02 2011
@@ -30,7 +30,7 @@ import java.io.IOException;
 @SuppressWarnings("rawtypes")
 public abstract class MutableVertex<I extends WritableComparable,
         V extends Writable, E extends Writable, M extends Writable>
-        extends BasicVertex<I, V, E, M> implements Writable {
+        extends BasicVertex<I, V, E, M> {
     /**
      * Set the vertex id
      *
@@ -57,15 +57,15 @@ public abstract class MutableVertex<I ex
     public abstract E removeEdge(I targetVertexId);
 
     /**
-     * Create a vertex for use in addVertexRequest().  Still need to get the
-     * vertex id and vertex value.
+     * Create a vertex to add to the graph.
      *
-     * @return Created vertex for addVertexRequest.
+     * @return A new vertex for adding to the graph
      */
     public MutableVertex<I, V, E, M> instantiateVertex() {
-        Vertex<I, V, E, M> mutableVertex =
-            BspUtils.createVertex(getContext().getConfiguration(),
-                                  getGraphState());
+        MutableVertex<I, V, E, M> mutableVertex =
+            (MutableVertex<I, V, E, M>) BspUtils
+               .<I, V, E, M>createVertex(getContext().getConfiguration());
+        mutableVertex.setGraphState(getGraphState());
         return mutableVertex;
     }
 
@@ -79,7 +79,7 @@ public abstract class MutableVertex<I ex
             throws IOException {
         getGraphState().getGraphMapper().getWorkerCommunications().
         addVertexReq(vertex);
-}
+    }
 
     /**
      * Request to remove a vertex from the graph

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Wed Nov  2 15:40:02 2011
@@ -25,7 +25,12 @@ import org.apache.log4j.Logger;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 /**
  * User applications often subclass {@link Vertex}, which stores the outbound
@@ -59,6 +64,24 @@ public abstract class Vertex<I extends W
     private final List<M> msgList = new ArrayList<M>();
 
     @Override
+    public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+        if (vertexId != null) {
+            setVertexId(vertexId);
+        }
+        if (vertexValue != null) {
+          setVertexValue(vertexValue);
+        }
+        if (edges != null && !edges.isEmpty()) {
+          for(Map.Entry<I, E> entry : edges.entrySet()) {
+            destEdgeMap.put(entry.getKey(), new Edge<I, E>(entry.getKey(), entry.getValue()));
+          }
+        }
+        if (messages != null && !messages.isEmpty()) {
+            msgList.addAll(messages);
+        }
+    }
+
+    @Override
     public void preApplication()
             throws InstantiationException, IllegalAccessException {
         // Do nothing, might be overriden by the user
@@ -96,6 +119,11 @@ public abstract class Vertex<I extends W
     }
 
     @Override
+    public long getSuperstep() {
+        return getGraphState().getSuperstep();
+    }
+
+    @Override
     public final void setVertexId(I vertexId) {
         this.vertexId = vertexId;
     }
@@ -163,6 +191,34 @@ public abstract class Vertex<I extends W
         }
     }
 
+
+    @Override
+    public void addVertexRequest(MutableVertex<I, V, E, M> vertex)
+            throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().
+            addVertexReq(vertex);
+    }
+
+    @Override
+    public void removeVertexRequest(I vertexId) throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().
+            removeVertexReq(vertexId);
+    }
+
+    @Override
+    public void addEdgeRequest(I vertexIndex,
+                               Edge<I, E> edge) throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().
+            addEdgeReq(vertexIndex, edge);
+    }
+
+    @Override
+    public void removeEdgeRequest(I sourceVertexId,
+                                  I destVertexId) throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().
+            removeEdgeReq(sourceVertexId, destVertexId);
+    }
+
     @Override
     public final void voteToHalt() {
         halt = true;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java Wed Nov  2 15:40:02 2011
@@ -18,15 +18,15 @@
 
 package org.apache.giraph.graph;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import java.io.IOException;
+import java.util.List;
+
 /**
  * Use this to load data for a BSP application.  Note that the InputSplit must
  * also implement Writable.  The InputSplits will determine the partitioning of
@@ -39,7 +39,8 @@ import org.apache.hadoop.mapreduce.TaskA
  */
 @SuppressWarnings("rawtypes")
 public abstract class VertexInputFormat<I extends WritableComparable,
-        V extends Writable, E extends Writable> {
+        V extends Writable, E extends Writable, M extends Writable> {
+
     /**
      * Logically split the vertices for a graph processing application.
      *
@@ -73,7 +74,7 @@ public abstract class VertexInputFormat<
      * @throws IOException
      * @throws InterruptedException
      */
-    public abstract VertexReader<I, V, E> createVertexReader(
+    public abstract VertexReader<I, V, E, M> createVertexReader(
         InputSplit split,
         TaskAttemptContext context) throws IOException;
 }



Mime
View raw message