incubator-giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1201987 [1/5] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/...
Date Tue, 15 Nov 2011 00:54:22 GMT
Author: aching
Date: Tue Nov 15 00:54:20 2011
New Revision: 1201987

URL: http://svn.apache.org/viewvc?rev=1201987&view=rev
Log:
GIRAPH-11: Improve the graph distribution of Giraph. (aching)


Added:
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java   (with props)
    incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
Removed:
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SuperstepBalancer.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AutoBalancer.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexRangeBalancer.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/StaticBalancer.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRangeBalancer.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/pom.xml
    incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Tue Nov 15 00:54:20 2011
@@ -1,7 +1,9 @@
 Giraph Change Log
 
 Release 0.70.0 - unreleased
- 
+
+  GIRAPH-11: Improve the graph distribution of Giraph. (aching)
+  
   GIRAPH-64: Create VertexRunner to make it easier to run users'
   computations. (jghoman)
  

Modified: incubator/giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/pom.xml?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/pom.xml (original)
+++ incubator/giraph/trunk/pom.xml Tue Nov 15 00:54:20 2011
@@ -499,7 +499,7 @@ under the License.
     <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
-      <version>3.3.1</version>
+      <version>3.3.3</version>
       <exclusions>
         <exclusion>
           <groupId>com.sun.jmx</groupId>

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java Tue Nov 15 00:54:20 2011
@@ -147,7 +147,7 @@ public class PseudoRandomVertexInputForm
 
         @Override
         public BasicVertex<LongWritable, DoubleWritable, DoubleWritable, M> getCurrentVertex()
-            throws IOException, InterruptedException {
+                throws IOException, InterruptedException {
             BasicVertex<LongWritable, DoubleWritable, DoubleWritable, M> vertex =
                 BspUtils.createVertex(configuration);
             long vertexId = startingVertexId + verticesRead;
@@ -163,11 +163,10 @@ public class PseudoRandomVertexInputForm
                     destVertexId =
                         new LongWritable(Math.abs(rand.nextLong()) %
                                          aggregateVertices);
-                } while (vertex.hasEdge(destVertexId));
+                } while (edges.containsKey(destVertexId));
                 edges.put(destVertexId, new DoubleWritable(rand.nextDouble()));
             }
             vertex.initialize(new LongWritable(vertexId), vertexValue, edges, null);
-
             ++verticesRead;
             if (LOG.isDebugEnabled()) {
                 LOG.debug("next: Return vertexId=" +

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Tue Nov 15 00:54:20 2011
@@ -19,14 +19,19 @@
 package org.apache.giraph.bsp;
 
 import java.io.IOException;
-import java.util.NavigableMap;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import org.apache.giraph.graph.AggregatorUsage;
+import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.GraphMapper;
-import org.apache.giraph.graph.VertexRange;
-import org.apache.giraph.graph.BasicVertexRangeBalancer;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.WorkerContext;
 
 /**
@@ -41,41 +46,34 @@ public interface CentralizedServiceWorke
         M extends Writable>
         extends CentralizedService<I, V, E, M>, AggregatorUsage {
     /**
-     * Get the hostname of this worker
+     * Get the worker information
      *
-     * @return hostname of this worker
+     * @return Worker information
      */
-    String getHostname();
-
-    /**
-     * Get the port of the RPC server on this worker.
-     *
-     * @return RPC server of this worker
-     */
-    int getPort();
+    WorkerInfo getWorkerInfo();
 
    /**
-    * 
+    *
     * @return worker's WorkerContext
     */
     WorkerContext getWorkerContext();
 
     /**
-     * Get a synchronized map to the partitions and their sorted vertex lists.
-     * This could be used to run compute for the vertices or checkpointing.
+     * Get a map of the partition id to the partition for this worker.
+     * The partitions contain the vertices for
+     * this worker and can be used to run compute() for the vertices or do
+     * checkpointing.
      *
-     * @return map of max vertex index to list of vertices on that vertex range
+     * @return List of partitions that this worker owns.
      */
-    NavigableMap<I, VertexRange<I, V, E, M>> getVertexRangeMap();
+    Map<Integer, Partition<I, V, E, M>> getPartitionMap();
 
     /**
-     * Get the current map to the partitions and their sorted vertex lists.
-     * This is needed by the communication service to shift incoming messages
-     * to the vertex lists before the new map gets synchronized.
+     * Get a collection of all the partition owners.
      *
-     * @return map of max vertex index to list of vertices on that vertex range
+     * @return Collection of all the partition owners.
      */
-    NavigableMap<I, VertexRange<I, V, E, M>> getCurrentVertexRangeMap();
+    Collection<? extends PartitionOwner> getPartitionOwners();
 
     /**
      *  Both the vertices and the messages need to be checkpointed in order
@@ -98,58 +96,57 @@ public interface CentralizedServiceWorke
      * Take all steps prior to actually beginning the computation of a
      * superstep.
      *
-     * @return true if part of this superstep, false otherwise
+     * @return Collection of all the partition owners from the master for this
+     *         superstep.
      */
-    boolean startSuperstep();
+    Collection<? extends PartitionOwner> startSuperstep();
 
     /**
      * Worker is done with its portion of the superstep.  Report the
      * worker level statistics after the computation.
      *
-     * @param workerFinishedVertices Number of finished vertices on this worker
-     * @param workerVertices Number of vertices on this worker
-     * @param workerEdges Number of edges on this worker
+     * @param partitionStatsList All the partition stats for this worker
      * @param workersSentMessages Number of messages sent on this worker
      * @return true if this is the last superstep, false otherwise
      */
-    boolean finishSuperstep(long workerFinishedVertices,
-                            long workerVertices,
-                            long workerEdges,
+    boolean finishSuperstep(List<PartitionStats> partitionStatsList,
                             long workersSentMessages);
-
     /**
-     * Every client will need to get a vertex range for a vertex id so that
-     * they know where to sent the request.
+     * Get the partition that a vertex index would belong to
      *
-     * @param superstep Superstep to look for
-     * @param vertexIndex Vertex index to look for
-     * @return VertexRange that should contain this vertex if it exists
+     * @param vertexIndex Index of the vertex that is used to find the correct
+     *        partition.
+     * @return Correct partition if exists on this worker, null otherwise.
      */
-    VertexRange<I, V, E, M> getVertexRange(long superstep, I vertexIndex);
+    public Partition<I, V, E, M> getPartition(I vertexIndex);
 
     /**
-     * Get the total vertices in the entire application during a given
-     * superstep.  Note that this is the number of vertices prior to the
-     * superstep starting and does not change during the superstep.
+     * Every client will need to get a partition owner from a vertex id so that
+     * they know which worker to sent the request to.
      *
-     * @return count of all the vertices (local and non-local together)
+     * @param superstep Superstep to look for
+     * @param vertexIndex Vertex index to look for
+     * @return PartitionOnwer that should contain this vertex if it exists
      */
-    long getTotalVertices();
+    PartitionOwner getVertexPartitionOwner(I vertexIndex);
 
     /**
-     * Get the total edges in the entire application during a given
-     * superstep.  Note that this is the number of edges prior to the
-     * superstep starting and does not change during the superstep.
+     * Look up a vertex on a worker given its vertex index.
      *
-     * @return count of all the edges (local and non-local together)
+     * @param vertexIndex Vertex index to look for
+     * @return Vertex if it exists on this worker.
      */
-    long getTotalEdges();
+    BasicVertex<I, V, E, M> getVertex(I vertexIndex);
 
     /**
-     * If desired by the user, vertex ranges are redistributed among workers
-     * according to the chosen {@link BasicVertexRangeBalancer}.
+     * If desired by the user, vertex partitions are redistributed among
+     * workers according to the chosen {@link GraphPartitioner}.
+     *
+     * @param masterSetPartitionOwners Partition owner info passed from the
+     *        master.
      */
-    void exchangeVertexRanges();
+    void exchangeVertexPartitions(
+        Collection<? extends PartitionOwner> masterSetPartitionOwners);
 
     /**
      * Get the GraphMapper that this service is using.  Vertices need to know

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Tue Nov 15 00:54:20 2011
@@ -27,7 +27,6 @@ import org.apache.giraph.graph.MutableVe
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.graph.VertexMutations;
-import org.apache.giraph.graph.VertexRange;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -48,7 +47,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -56,6 +54,10 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
+
 /*if[HADOOP_FACEBOOK]
 import org.apache.hadoop.ipc.ProtocolSignature;
 end[HADOOP_FACEBOOK]*/
@@ -96,6 +98,12 @@ public abstract class BasicRPCCommunicat
     private final Map<InetSocketAddress, PeerConnection> peerConnections =
         new HashMap<InetSocketAddress, PeerConnection>();
     /**
+     * Cached map of partition ids to remote socket address.  Needs to be
+     * synchronized.
+     */
+    private final Map<Integer, InetSocketAddress> partitionIndexAddressMap =
+        new HashMap<Integer, InetSocketAddress>();
+    /**
      * Thread pool for message flush threads
      */
     private final ExecutorService executor;
@@ -122,24 +130,20 @@ public abstract class BasicRPCCommunicat
     private final Map<I, List<M>> transientInMessages =
         new HashMap<I, List<M>>();
     /**
-     * Map of vertex ranges to any incoming vertices from other workers.
+     * Map of partition ids to incoming vertices from other workers.
      * (Synchronized)
      */
-    private final Map<I, List<BasicVertex<I, V, E, M>>>
-        inVertexRangeMap =
-            new TreeMap<I, List<BasicVertex<I, V, E, M>>>();
+    private final Map<Integer, List<BasicVertex<I, V, E, M>>>
+        inPartitionVertexMap =
+            new HashMap<Integer, List<BasicVertex<I, V, E, M>>>();
+
     /**
      * Map from vertex index to all vertex mutations
      */
     private final Map<I, VertexMutations<I, V, E, M>>
         inVertexMutationsMap =
             new TreeMap<I, VertexMutations<I, V, E, M>>();
-    /**
-     * Cached map of vertex ranges to remote socket address.  Needs to be
-     * synchronized.
-     */
-    private final Map<I, InetSocketAddress> vertexIndexMapAddressMap =
-        new HashMap<I, InetSocketAddress>();
+
     /** Maximum size of cached message list, before sending it out */
     private final int maxSize;
     /** Cached job id */
@@ -380,8 +384,18 @@ public abstract class BasicRPCCommunicat
                      numHandlers + " handlers and " + numFlushThreads +
                      " flush threads");
         }
+    }
 
-        connectAllRPCProxys(this.jobId, this.jobToken);
+    @Override
+    public void setup() {
+        try {
+            connectAllRPCProxys(this.jobId, this.jobToken);
+        } catch (IOException e) {
+            throw new IllegalStateException("setup: Got IOException", e);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException("setup: Got InterruptedException",
+                                            e);
+        }
     }
 
     protected abstract CommunicationsInterface<I, V, E, M> getRPCProxy(
@@ -400,17 +414,17 @@ public abstract class BasicRPCCommunicat
     private void connectAllRPCProxys(String jobId, J jobToken)
             throws IOException, InterruptedException {
         final int maxTries = 5;
-        for (VertexRange<I, V, E, M> vertexRange :
-                service.getVertexRangeMap().values()) {
+        for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
             int tries = 0;
             while (tries < maxTries) {
                 try {
-                    startPeerConnectionThread(vertexRange, jobId, jobToken);
+                    startPeerConnectionThread(
+                        partitionOwner.getWorkerInfo(), jobId, jobToken);
                     break;
                 } catch (IOException e) {
                     LOG.warn("connectAllRPCProxys: Failed on attempt " +
                              tries + " of " + maxTries +
-                             " to connect to " + vertexRange.toString());
+                             " to connect to " + partitionOwner.toString(), e);
                     ++tries;
                 }
             }
@@ -418,24 +432,27 @@ public abstract class BasicRPCCommunicat
     }
 
     /**
-     * Starts a thread for a vertex range if any only if the inet socket
+     * Creates the connections to remote RPCs if any only if the inet socket
      * address doesn't already exist.
      *
-     * @param vertexRange
+     * @param workerInfo My worker info
+     * @param jobId Id of the job
+     * @param jobToken Required for secure Hadoop
      * @throws IOException
+     * @throws InterruptedException
      */
-    private void startPeerConnectionThread(VertexRange<I, V, E, M> vertexRange,
+    private void startPeerConnectionThread(WorkerInfo workerInfo,
                                            String jobId,
                                            J jobToken)
             throws IOException, InterruptedException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("startPeerConnectionThread: hostname " +
-                      vertexRange.getHostname() + ", port " +
-                      vertexRange.getPort());
+                      workerInfo.getHostname() + ", port " +
+                      workerInfo.getPort());
         }
         final InetSocketAddress addr =
-            new InetSocketAddress(vertexRange.getHostname(),
-                                  vertexRange.getPort());
+            new InetSocketAddress(workerInfo.getHostname(),
+                                  workerInfo.getPort());
         // Cheap way to hold both the hostname and port (rather than
         // make a class)
         InetSocketAddress addrUnresolved =
@@ -448,9 +465,7 @@ public abstract class BasicRPCCommunicat
             outMsgMap = outMessages.get(addrUnresolved);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("startPeerConnectionThread: Connecting to " +
-                          vertexRange.getHostname() + ", port = " +
-                          vertexRange.getPort() + ", max index = " +
-                          vertexRange.getMaxIndex() + ", addr = " + addr +
+                          workerInfo.toString() + ", addr = " + addr +
                           " if outMsgMap (" + outMsgMap + ") == null ");
             }
             if (outMsgMap != null) { // this host has already been added
@@ -551,25 +566,26 @@ end[HADOOP_FACEBOOK]*/
     }
 
     @Override
-    public final void putVertexList(I vertexIndexMax,
+    public final void putVertexList(int partitionId,
                                     VertexList<I, V, E, M> vertexList)
             throws IOException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("putVertexList: On vertex range " + vertexIndexMax +
+            LOG.debug("putVertexList: On partition id " + partitionId +
                       " adding vertex list of size " + vertexList.size());
         }
-        synchronized (inVertexRangeMap) {
+        synchronized (inPartitionVertexMap) {
             if (vertexList.size() == 0) {
                 return;
             }
-            if (!inVertexRangeMap.containsKey(vertexIndexMax)) {
-                inVertexRangeMap.put(vertexIndexMax,
-                                     new ArrayList<BasicVertex<I, V, E, M>>());
-            }
-            List<BasicVertex<I, V, E, M>> tmpVertexList =
-                inVertexRangeMap.get(vertexIndexMax);
-            for (BasicVertex<I, V, E, M> hadoopVertex : vertexList) {
-                tmpVertexList.add(hadoopVertex);
+            if (!inPartitionVertexMap.containsKey(partitionId)) {
+                inPartitionVertexMap.put(partitionId,
+                    new ArrayList<BasicVertex<I, V, E, M>>(vertexList));
+            } else {
+                List<BasicVertex<I, V, E, M>> tmpVertexList =
+                    inPartitionVertexMap.get(partitionId);
+                for (BasicVertex<I, V, E, M> hadoopVertex : vertexList) {
+                    tmpVertexList.add(hadoopVertex);
+                }
             }
         }
     }
@@ -644,31 +660,28 @@ end[HADOOP_FACEBOOK]*/
     }
 
     @Override
-    public final void sendVertexListReq(I vertexIndexMax,
-                                        List<BasicVertex<I, V, E, M>> vertexList) {
+    public final void sendPartitionReq(WorkerInfo workerInfo,
+                                       Partition<I, V, E, M> partition) {
         // Internally, break up the sending so that the list doesn't get too
         // big.
         VertexList<I, V, E, M> hadoopVertexList =
             new VertexList<I, V, E, M>();
-        InetSocketAddress addr = getInetSocketAddress(vertexIndexMax);
+        InetSocketAddress addr =
+            getInetSocketAddress(workerInfo, partition.getPartitionId());
         CommunicationsInterface<I, V, E, M> rpcProxy =
             peerConnections.get(addr).getRPCProxy();
 
         if (LOG.isInfoEnabled()) {
-            LOG.info("sendVertexList: Sending to " + rpcProxy.getName() + " " +
-                     addr + ", with vertex index " + vertexIndexMax +
-                     ", list " + vertexList);
-        }
-        if (peerConnections.get(addr).isProxy == false) {
-            throw new RuntimeException("sendVertexList: Impossible to send " +
-                "to self for vertex index max " + vertexIndexMax);
-        }
-        for (long i = 0; i < vertexList.size(); ++i) {
-            hadoopVertexList.add(
-                (Vertex<I, V, E, M>) vertexList.get((int) i));
+            LOG.info("sendPartitionReq: Sending to " + rpcProxy.getName() +
+                     " " + addr + " from " + workerInfo +
+                     ", with partition " + partition);
+        }
+        for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
+            hadoopVertexList.add(vertex);
             if (hadoopVertexList.size() >= MAX_VERTICES_PER_RPC) {
                 try {
-                    rpcProxy.putVertexList(vertexIndexMax, hadoopVertexList);
+                    rpcProxy.putVertexList(partition.getPartitionId(),
+                                           hadoopVertexList);
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -677,7 +690,8 @@ end[HADOOP_FACEBOOK]*/
         }
         if (hadoopVertexList.size() > 0) {
             try {
-                rpcProxy.putVertexList(vertexIndexMax, hadoopVertexList);
+                rpcProxy.putVertexList(partition.getPartitionId(),
+                                       hadoopVertexList);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
@@ -685,35 +699,48 @@ end[HADOOP_FACEBOOK]*/
     }
 
     /**
-     * Fill the socket address cache for the vertex range
+     * Fill the socket address cache for the worker info and its partition.
      *
-     * @param destVertex vertex
+     * @param workerInfo Worker information to get the socket address
+     * @param partitionId
      * @return address of the vertex range server containing this vertex
      */
-    private InetSocketAddress getInetSocketAddress(I destVertex) {
-        VertexRange<I, V, E, M> destVertexRange =
-            service.getVertexRange(service.getSuperstep(), destVertex);
-        if (destVertexRange == null) {
-            LOG.error("getInetSocketAddress: No vertexRange found for " +
-                      destVertex);
-            throw new RuntimeException("getInetSocketAddress: Dest vertex " +
-                                       destVertex);
-        }
-
-        synchronized(vertexIndexMapAddressMap) {
+    private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
+                                                   int partitionId) {
+        synchronized(partitionIndexAddressMap) {
             InetSocketAddress address =
-                vertexIndexMapAddressMap.get(destVertexRange.getMaxIndex());
+                partitionIndexAddressMap.get(partitionId);
             if (address == null) {
                 address = InetSocketAddress.createUnresolved(
-                    destVertexRange.getHostname(),
-                    destVertexRange.getPort());
-                vertexIndexMapAddressMap.put(destVertexRange.getMaxIndex(),
-                                             address);
+                    workerInfo.getHostname(),
+                    workerInfo.getPort());
+                partitionIndexAddressMap.put(partitionId, address);
+            }
+
+            if (address.getPort() != workerInfo.getPort() ||
+                    !address.getHostName().equals(workerInfo.getHostname())) {
+                throw new IllegalStateException(
+                    "getInetSocketAddress: Impossible that address " +
+                    address + " does not match " + workerInfo);
             }
+
             return address;
         }
     }
 
+    /**
+     * Fill the socket address cache for the partition owner.
+     *
+     * @param destVertex vertex to be sent
+     * @return address of the vertex range server containing this vertex
+     */
+    private InetSocketAddress getInetSocketAddress(I destVertex) {
+        PartitionOwner partitionOwner =
+            service.getVertexPartitionOwner(destVertex);
+        return getInetSocketAddress(partitionOwner.getWorkerInfo(),
+                                    partitionOwner.getPartitionId());
+    }
+
     @Override
     public final void sendMessageReq(I destVertex, M msg) {
         InetSocketAddress addr = getInetSocketAddress(destVertex);
@@ -812,7 +839,8 @@ end[HADOOP_FACEBOOK]*/
         Collection<Future<?>> futures = new ArrayList<Future<?>>();
 
         // randomize peers in order to avoid hotspot on racks
-        List<PeerConnection> peerList = new ArrayList<PeerConnection>(peerConnections.values());
+        List<PeerConnection> peerList =
+            new ArrayList<PeerConnection>(peerConnections.values());
         Collections.shuffle(peerList);
 
         for (PeerConnection pc : peerList) {
@@ -867,13 +895,11 @@ end[HADOOP_FACEBOOK]*/
         }
 
         if (inMessages.size() > 0) {
-            // Assign the appropriate messages to each vertex
-            NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
-                service.getCurrentVertexRangeMap();
-            for (VertexRange<I, V, E, M> vertexRange :
-                    vertexRangeMap.values()) {
-                for (BasicVertex<I, V, E, M> vertex :
-                        vertexRange.getVertexMap().values()) {
+            // Assign the messages to each destination vertex (getting rid of
+            // the old ones)
+            for (Partition<I, V, E, M> partition :
+                    service.getPartitionMap().values()) {
+                for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
                     vertex.getMsgList().clear();
                     List<M> msgList = inMessages.get(vertex.getVertexId());
                     if (msgList != null) {
@@ -884,7 +910,8 @@ end[HADOOP_FACEBOOK]*/
                         }
                         for (M msg : msgList) {
                             if (msg == null) {
-                                LOG.warn("null message in inMessages");
+                                LOG.warn("prepareSuperstep: Null message " +
+                                         "in inMessages");
                             }
                         }
                         vertex.getMsgList().addAll(msgList);
@@ -919,10 +946,8 @@ end[HADOOP_FACEBOOK]*/
             VertexResolver<I, V, E, M> vertexResolver =
                 BspUtils.createVertexResolver(
                     conf, service.getGraphMapper().getGraphState());
-            VertexRange<I, V, E, M> vertexRange =
-                service.getVertexRange(service.getSuperstep() - 1, vertexIndex);
             BasicVertex<I, V, E, M> originalVertex =
-                vertexRange.getVertexMap().get(vertexIndex);
+                service.getVertex(vertexIndex);
             List<M> msgList = inMessages.get(vertexIndex);
             if (originalVertex != null) {
                 msgList = originalVertex.getMsgList();
@@ -942,12 +967,19 @@ end[HADOOP_FACEBOOK]*/
                           vertexMutations);
             }
 
+            Partition<I, V, E, M> partition =
+                service.getPartition(vertexIndex);
+            if (partition == null) {
+                throw new IllegalStateException(
+                    "prepareSuperstep: No partition for index " + vertexIndex +
+                    " in " + service.getPartitionMap() + " should have been " +
+                    service.getVertexPartitionOwner(vertexIndex));
+            }
             if (vertex != null) {
                 ((MutableVertex<I, V, E, M>) vertex).setVertexId(vertexIndex);
-                vertexRange.getVertexMap().put(vertex.getVertexId(),
-                                               (Vertex<I, V, E, M>) vertex);
+                partition.putVertex((Vertex<I, V, E, M>) vertex);
             } else if (originalVertex != null) {
-                vertexRange.getVertexMap().remove(originalVertex.getVertexId());
+                partition.removeVertex(originalVertex.getVertexId());
             }
         }
         synchronized (inVertexMutationsMap) {
@@ -956,25 +988,27 @@ end[HADOOP_FACEBOOK]*/
     }
 
     @Override
-    public void cleanCachedVertexAddressMap() {
-        // Fix all the cached inet addresses (remove all changed entries)
-        synchronized (vertexIndexMapAddressMap) {
-            for (Entry<I, VertexRange<I, V, E, M>> entry :
-                service.getVertexRangeMap().entrySet()) {
-               if (vertexIndexMapAddressMap.containsKey(entry.getKey())) {
-                   InetSocketAddress address =
-                       vertexIndexMapAddressMap.get(entry.getKey());
-                   if (!address.getHostName().equals(
-                           entry.getValue().getHostname()) ||
-                           address.getPort() !=
-                           entry.getValue().getPort()) {
-                       LOG.info("prepareSuperstep: Vertex range " +
-                                entry.getKey() + " changed from " +
-                                address + " to " +
-                                entry.getValue().getHostname() + ":" +
-                                entry.getValue().getPort());
-                       vertexIndexMapAddressMap.remove(entry.getKey());
+    public void fixPartitionIdToSocketAddrMap() {
+        // 1. Fix all the cached inet addresses (remove all changed entries)
+        // 2. Connect to any new RPC servers
+        synchronized (partitionIndexAddressMap) {
+            for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
+                InetSocketAddress address =
+                    partitionIndexAddressMap.get(
+                        partitionOwner.getPartitionId());
+               if (address != null &&
+                       (!address.getHostName().equals(
+                        partitionOwner.getWorkerInfo().getHostname()) ||
+                        address.getPort() !=
+                       partitionOwner.getWorkerInfo().getPort())) {
+                   if (LOG.isInfoEnabled()) {
+                       LOG.info("fixPartitionIdToSocketAddrMap: " +
+                                "Partition owner " +
+                                partitionOwner + " changed from " +
+                                address);
                    }
+                   partitionIndexAddressMap.remove(
+                       partitionOwner.getPartitionId());
                }
             }
         }
@@ -993,8 +1027,7 @@ end[HADOOP_FACEBOOK]*/
     }
 
     @Override
-    public Map<I, List<BasicVertex<I, V, E, M>>> getInVertexRangeMap() {
-        return inVertexRangeMap;
+    public Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap() {
+        return inPartitionVertexMap;
     }
-
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java Tue Nov 15 00:54:20 2011
@@ -22,7 +22,6 @@ import java.io.IOException;
 
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.MutableVertex;
-import org.apache.giraph.graph.VertexRange;
 /*if_not[HADOOP]
  else[HADOOP]*/
 import org.apache.giraph.hadoop.BspTokenSelector;
@@ -80,12 +79,11 @@ public interface CommunicationsInterface
     /**
      * Adds vertex list (index, value, edges, etc.) to the appropriate worker.
      *
-     * @param vertexIndexMax Max vertex index of {@link VertexRange}
+     * @param partitionId Partition id of the vertices to be added.
      * @param vertexList List of vertices to add
      */
-    void putVertexList(I vertexIndexMax,
-                       VertexList<I, V, E, M> vertexList)
-        throws IOException;
+    void putVertexList(int partitionId,
+                       VertexList<I, V, E, M> vertexList) throws IOException;
 
     /**
      * Add an edge to a remote vertex

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java Tue Nov 15 00:54:20 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.security.token.
 import org.apache.log4j.Logger;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.hadoop.BspPolicyProvider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -63,7 +64,8 @@ else[HADOOP]*/
     public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
 
     public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
-                             CentralizedServiceWorker<I, V, E, M> service)
+                             CentralizedServiceWorker<I, V, E, M> service,
+                             GraphState<I, V, E, M> graphState)
             throws IOException, UnknownHostException, InterruptedException {
         super(context, service);
     }
@@ -99,7 +101,7 @@ else[HADOOP]*/
             ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
         if (conf.getBoolean(
                     hadoopSecurityAuthorization,
-		            false)) {
+                    false)) {
             ServiceAuthorizationManager.refresh(conf, new BspPolicyProvider());
         }
         JobTokenSecretManager jobTokenSecretManager =
@@ -158,7 +160,7 @@ else[HADOOP]*/
                     CommunicationsInterface.class, versionID, addr, config);
             }
         });
-		return proxy;
+        return proxy;
 /*end[HADOOP]*/
     }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java Tue Nov 15 00:54:20 2011
@@ -35,6 +35,10 @@ public interface ServerInterface<I exten
                                  M extends Writable>
                                  extends Closeable,
                                  WorkerCommunications<I, V, E, M> {
+    /**
+     *  Setup the server.
+     */
+    void setup();
 
     /**
      * Move the in transition messages to the in messages for every vertex and

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java Tue Nov 15 00:54:20 2011
@@ -21,6 +21,9 @@ package org.apache.giraph.comm;
 import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.MutableVertex;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.Partition;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -35,18 +38,17 @@ import java.util.Map;
  * @param <V extends Writable> vertex value
  * @param <E extends Writable> edge value
  * @param <M extends Writable> message data
- *
- **/
+ */
 @SuppressWarnings("rawtypes")
 public interface WorkerCommunications<I extends WritableComparable,
                                       V extends Writable,
                                       E extends Writable,
                                       M extends Writable> {
     /**
-     * Clean the cached map of vertex addresses that have changed
-     * because of rebalancing.
+     * Fix changes to the workers and the mapping between partitions and
+     * workers.
      */
-    void cleanCachedVertexAddressMap();
+    void fixPartitionIdToSocketAddrMap();
 
     /**
      * Sends a message to destination vertex.
@@ -57,13 +59,13 @@ public interface WorkerCommunications<I 
     void sendMessageReq(I id, M msg);
 
     /**
-     * Sends a list of vertices to the appropriate vertex range owner
+     * Sends a partition to the appropriate partition owner
      *
-     * @param vertexIndexMax Vertex range that the vertices belong to
-     * @param vertexList List of vertices assigned to the vertexRangeIndex
+     * @param workerInfo Owner the vertices belong to
+     * @param partition Partition to send
      */
-    void sendVertexListReq(I vertexIndexMax,
-                           List<BasicVertex<I, V, E, M>> vertexList);
+    void sendPartitionReq(WorkerInfo workerInfo,
+                          Partition<I, V, E, M> partition);
 
     /**
      * Sends a request to the appropriate vertex range owner to add an edge
@@ -107,5 +109,5 @@ public interface WorkerCommunications<I 
      *
      * @return map of vertex ranges to vertices
      */
-    Map<I, List<BasicVertex<I, V, E, M>>> getInVertexRangeMap();
+    Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java Tue Nov 15 00:54:20 2011
@@ -42,10 +42,8 @@ public abstract class GeneratedVertexInp
     @Override
     public List<InputSplit> getSplits(JobContext context, int numWorkers)
         throws IOException, InterruptedException {
-        /*
-         * This is meaningless, the VertexReader will generate all the test
-         * data.
-         */
+         // This is meaningless, the VertexReader will generate all the test
+         // data.
         List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
         for (int i = 0; i < numWorkers; ++i) {
             inputSplitList.add(new BspInputSplit(i, numWorkers));

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java Tue Nov 15 00:54:20 2011
@@ -46,12 +46,17 @@ public abstract class GeneratedVertexRea
     protected long totalRecords = 0;
     /** The input split from initialize(). */
     protected BspInputSplit inputSplit = null;
+    /** Reverse the id order? */
+    protected boolean reverseIdOrder;
 
     protected Configuration configuration = null;
 
     public static final String READER_VERTICES =
-        "TestVertexReader.reader_vertices";
+        "GeneratedVertexReader.reader_vertices";
     public static final long DEFAULT_READER_VERTICES = 10;
+    public static final String REVERSE_ID_ORDER =
+        "GeneratedVertexReader.reverseIdOrder";
+    public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
 
     public GeneratedVertexReader() {
     }
@@ -62,8 +67,11 @@ public abstract class GeneratedVertexRea
             throws IOException {
         configuration = context.getConfiguration();
         totalRecords = configuration.getLong(
-                GeneratedVertexReader.READER_VERTICES,
-                GeneratedVertexReader.DEFAULT_READER_VERTICES);
+            GeneratedVertexReader.READER_VERTICES,
+            GeneratedVertexReader.DEFAULT_READER_VERTICES);
+        reverseIdOrder = configuration.getBoolean(
+            GeneratedVertexReader.REVERSE_ID_ORDER,
+            GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
         this.inputSplit = (BspInputSplit) inputSplit;
     }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java Tue Nov 15 00:54:20 2011
@@ -49,5 +49,5 @@ public class MaxAggregator implements Ag
   public DoubleWritable createAggregatedValue() {
       return new DoubleWritable();
   }
-  
+
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java Tue Nov 15 00:54:20 2011
@@ -35,7 +35,7 @@ public class MinAggregator implements Ag
       double val = value.get();
       if (val < min) {
           min = val;
-      }   
+      }
   }
 
   public void setAggregatedValue(DoubleWritable value) {
@@ -49,5 +49,5 @@ public class MinAggregator implements Ag
   public DoubleWritable createAggregatedValue() {
       return new DoubleWritable();
   }
-  
+
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Tue Nov 15 00:54:20 2011
@@ -54,12 +54,13 @@ public class SimpleMutateGraphVertex ext
     }
 
     @Override
-    public void compute(Iterator<DoubleWritable> msgIterator) throws IOException {
+    public void compute(Iterator<DoubleWritable> msgIterator)
+            throws IOException {
 
     	SimpleMutateGraphVertexWorkerContext workerContext =
     		(SimpleMutateGraphVertexWorkerContext) getWorkerContext();
-
-    	if (getSuperstep() == 1) {
+    	if (getSuperstep() == 0) {
+    	} else if (getSuperstep() == 1) {
             // Send messages to vertices that are sure not to exist
             // (creating them)
             LongWritable destVertexId =
@@ -67,18 +68,18 @@ public class SimpleMutateGraphVertex ext
             sendMsg(destVertexId, new DoubleWritable(0.0));
         } else if (getSuperstep() == 2) {
         } else if (getSuperstep() == 3) {
-        	long vertex_count = workerContext.getVertexCount();
-            if (vertex_count * 2 != getNumVertices()) {
+        	long vertexCount = workerContext.getVertexCount();
+            if (vertexCount * 2 != getNumVertices()) {
                 throw new IllegalStateException(
                     "Impossible to have " + getNumVertices() +
-                    " vertices when should have " + vertex_count * 2 +
+                    " vertices when should have " + vertexCount * 2 +
                     " on superstep " + getSuperstep());
             }
-            long edge_count = workerContext.getEdgeCount();
-            if (edge_count != getNumEdges()) {
+            long edgeCount = workerContext.getEdgeCount();
+            if (edgeCount != getNumEdges()) {
                 throw new IllegalStateException(
                     "Impossible to have " + getNumEdges() +
-                    " edges when should have " + edge_count +
+                    " edges when should have " + edgeCount +
                     " on superstep " + getSuperstep());
             }
             // Create vertices that are sure not to exist (doubling vertices)
@@ -94,18 +95,18 @@ public class SimpleMutateGraphVertex ext
                                getVertexId(), new FloatWritable(0.0f)));
         } else if (getSuperstep() == 4) {
         } else if (getSuperstep() == 5) {
-        	long vertex_count = workerContext.getVertexCount();
-            if (vertex_count * 2 != getNumVertices()) {
+        	long vertexCount = workerContext.getVertexCount();
+            if (vertexCount * 2 != getNumVertices()) {
                 throw new IllegalStateException(
                     "Impossible to have " + getNumVertices() +
-                    " when should have " + vertex_count * 2 +
+                    " when should have " + vertexCount * 2 +
                     " on superstep " + getSuperstep());
             }
-            long edge_count = workerContext.getEdgeCount();
-            if (edge_count + vertex_count != getNumEdges()) {
+            long edgeCount = workerContext.getEdgeCount();
+            if (edgeCount + vertexCount != getNumEdges()) {
                 throw new IllegalStateException(
                     "Impossible to have " + getNumEdges() +
-                    " edges when should have " + edge_count + vertex_count +
+                    " edges when should have " + edgeCount + vertexCount +
                     " on superstep " + getSuperstep());
             }
             // Remove the edges created in superstep 3
@@ -141,7 +142,8 @@ public class SimpleMutateGraphVertex ext
         }
     }
 
-    public static class SimpleMutateGraphVertexWorkerContext extends WorkerContext {
+    public static class SimpleMutateGraphVertexWorkerContext
+            extends WorkerContext {
         /** Cached vertex count */
         private long vertexCount;
         /** Cached edge count */
@@ -158,8 +160,11 @@ public class SimpleMutateGraphVertex ext
 		@Override
 		public void postApplication() { }
 
+        @Override
+        public void preSuperstep() { }
+
 		@Override
-		public void preSuperstep() {
+		public void postSuperstep() {
 			vertexCount = getNumVertices();
 			edgeCount = getNumEdges();
 			if (getSuperstep() == 1) {
@@ -172,9 +177,6 @@ public class SimpleMutateGraphVertex ext
 			edgesRemoved = 0;
 		}
 
-		@Override
-		public void postSuperstep() { }
-
 		public long getVertexCount() {
 			return vertexCount;
 		}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Tue Nov 15 00:54:20 2011
@@ -56,7 +56,8 @@ public class SimpleSuperstepVertex exten
      * Simple VertexReader that supports {@link SimpleSuperstepVertex}
      */
     public static class SimpleSuperstepVertexReader extends
-            GeneratedVertexReader<LongWritable, IntWritable, FloatWritable, IntWritable> {
+            GeneratedVertexReader<LongWritable, IntWritable,
+            FloatWritable, IntWritable> {
         /** Class logger */
         private static final Logger LOG =
             Logger.getLogger(SimpleSuperstepVertexReader.class);
@@ -70,18 +71,28 @@ public class SimpleSuperstepVertex exten
         }
 
         @Override
-        public BasicVertex<LongWritable, IntWritable, FloatWritable, IntWritable> getCurrentVertex()
-            throws IOException, InterruptedException {
-            BasicVertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
-                BspUtils.<LongWritable, IntWritable, FloatWritable, IntWritable>createVertex(
+        public BasicVertex<LongWritable, IntWritable, FloatWritable,
+                IntWritable> getCurrentVertex()
+                throws IOException, InterruptedException {
+            BasicVertex<LongWritable, IntWritable,
+                        FloatWritable, IntWritable> vertex =
+                BspUtils.<LongWritable, IntWritable,
+                          FloatWritable, IntWritable>createVertex(
                     configuration);
-            LongWritable vertexId = new LongWritable(
-                (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
-            IntWritable vertexValue = new IntWritable((int) (vertexId.get() * 10));
+            long tmpId = reverseIdOrder ?
+                ((inputSplit.getSplitIndex() + 1) * totalRecords) -
+                    recordsRead - 1 :
+                (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
+            LongWritable vertexId = new LongWritable(tmpId);
+            IntWritable vertexValue =
+                new IntWritable((int) (vertexId.get() * 10));
             Map<LongWritable, FloatWritable> edgeMap = Maps.newHashMap();
-            long destVertexId = (vertexId.get() + 1) % (inputSplit.getNumSplits() * totalRecords);
+            long destVertexId =
+                (vertexId.get() + 1) %
+                    (inputSplit.getNumSplits() * totalRecords);
             float edgeValue = vertexId.get() * 100f;
-            edgeMap.put(new LongWritable(destVertexId), new FloatWritable(edgeValue));
+            edgeMap.put(new LongWritable(destVertexId),
+                        new FloatWritable(edgeValue));
             vertex.initialize(vertexId, vertexValue, edgeMap, null);
             ++recordsRead;
             if (LOG.isInfoEnabled()) {

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.*;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * An example that simply uses its id, value, and edges to compute new data
+ * every iteration to verify that messages are sent and received at the
+ * appropriate location and superstep.
+ */
+public class VerifyMessage {
+    public static class VerifiableMessage implements Writable {
+        /** Superstep sent on */
+        public long superstep;
+        /** Source vertex id */
+        public long sourceVertexId;
+        /** Value */
+        public float value;
+
+        public VerifiableMessage() {}
+
+        public VerifiableMessage(
+                long superstep, long sourceVertexId, float value) {
+            this.superstep = superstep;
+            this.sourceVertexId = sourceVertexId;
+            this.value = value;
+        }
+
+        @Override
+        public void readFields(DataInput input) throws IOException {
+            superstep = input.readLong();
+            sourceVertexId = input.readLong();
+            value = input.readFloat();
+        }
+
+        @Override
+        public void write(DataOutput output) throws IOException {
+            output.writeLong(superstep);
+            output.writeLong(sourceVertexId);
+            output.writeFloat(value);
+        }
+
+        @Override
+        public String toString() {
+            return "(superstep=" + superstep + ",sourceVertexId=" +
+                sourceVertexId + ",value=" + value + ")";
+        }
+    }
+
+    public static class VerifyMessageVertex extends
+            Vertex<LongWritable, IntWritable, FloatWritable, VerifiableMessage> {
+        /** User can access this after the application finishes if local */
+        public static long finalSum;
+        /** Number of supersteps to run (6 by default) */
+        private static int supersteps = 6;
+        /** Class logger */
+        private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class);
+
+        /** Dynamically set number of supersteps */
+        public static final String SUPERSTEP_COUNT =
+            "verifyMessageVertex.superstepCount";
+
+        public static class VerifyMessageVertexWorkerContext extends
+                WorkerContext {
+            @Override
+            public void preApplication() throws InstantiationException,
+                    IllegalAccessException {
+                registerAggregator(LongSumAggregator.class.getName(),
+                    LongSumAggregator.class);
+                LongSumAggregator sumAggregator = (LongSumAggregator)
+                    getAggregator(LongSumAggregator.class.getName());
+                sumAggregator.setAggregatedValue(new LongWritable(0));
+                supersteps = getContext().getConfiguration().getInt(
+                    SUPERSTEP_COUNT, supersteps);
+            }
+
+            @Override
+            public void postApplication() {
+                LongSumAggregator sumAggregator = (LongSumAggregator)
+                    getAggregator(LongSumAggregator.class.getName());
+                finalSum = sumAggregator.getAggregatedValue().get();
+            }
+
+            @Override
+            public void preSuperstep() {
+                useAggregator(LongSumAggregator.class.getName());
+            }
+
+            @Override
+            public void postSuperstep() {}
+        }
+
+        @Override
+        public void compute(Iterator<VerifiableMessage> msgIterator) {
+            LongSumAggregator sumAggregator = (LongSumAggregator)
+                getAggregator(LongSumAggregator.class.getName());
+            if (getSuperstep() > supersteps) {
+                voteToHalt();
+                return;
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("compute: " + sumAggregator);
+            }
+            sumAggregator.aggregate(getVertexId().get());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("compute: sum = " +
+                          sumAggregator.getAggregatedValue().get() +
+                          " for vertex " + getVertexId());
+            }
+            float msgValue = 0.0f;
+            while (msgIterator.hasNext()) {
+                VerifiableMessage msg = msgIterator.next();
+                msgValue += msg.value;
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("compute: got msg = " + msg +
+                              " for vertex id " + getVertexId() +
+                              ", vertex value " + getVertexValue() +
+                              " on superstep " + getSuperstep());
+                }
+                if (msg.superstep != getSuperstep() - 1) {
+                    throw new IllegalStateException(
+                        "compute: Impossible to not get a messsage from " +
+                        "the previous superstep, current superstep = " +
+                        getSuperstep());
+                }
+                if ((msg.sourceVertexId != getVertexId().get() - 1) &&
+                        (getVertexId().get() != 0)) {
+                    throw new IllegalStateException(
+                        "compute: Impossible that this message didn't come " +
+                        "from the previous vertex and came from " +
+                        msg.sourceVertexId);
+                }
+            }
+            int vertexValue = getVertexValue().get();
+            setVertexValue(new IntWritable(vertexValue + (int) msgValue));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("compute: vertex " + getVertexId() +
+                          " has value " + getVertexValue() +
+                          " on superstep " + getSuperstep());
+            }
+            for (LongWritable targetVertexId : this) {
+                FloatWritable edgeValue = getEdgeValue(targetVertexId);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("compute: vertex " + getVertexId() +
+                              " sending edgeValue " + edgeValue +
+                              " vertexValue " + vertexValue +
+                              " total " +
+                              (edgeValue.get() + (float) vertexValue) +
+                              " to vertex " + targetVertexId +
+                              " on superstep " + getSuperstep());
+                }
+                edgeValue.set(edgeValue.get() + (float) vertexValue);
+                addEdge(targetVertexId, edgeValue);
+                sendMsg(targetVertexId,
+                    new VerifiableMessage(
+                        getSuperstep(), getVertexId().get(), edgeValue.get()));
+            }
+        }
+    }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Tue Nov 15 00:54:20 2011
@@ -45,8 +45,11 @@ public abstract class BasicVertex<I exte
     private GraphState<I,V,E,M> graphState;
     /** Configuration */
     private Configuration conf;
+    /** If true, do not do anymore computation on this vertex. */
+    boolean halt = false;
 
-    public abstract void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages);
+    public abstract void initialize(
+        I vertexId, V vertexValue, Map<I, E> edges, List<M> messages);
 
     /**
      * Must be defined by user to do computation on a single Vertex.
@@ -151,7 +154,7 @@ public abstract class BasicVertex<I exte
             throw new IllegalArgumentException(
                 "sendMsg: Cannot send null message to " + id);
         }
-        getGraphState().getGraphMapper().getWorkerCommunications().
+        getGraphState().getWorkerCommunications().
             sendMessageReq(id, msg);
     }
 
@@ -162,16 +165,20 @@ public abstract class BasicVertex<I exte
 
     /**
      * After this is called, the compute() code will no longer be called for
-     * this vertice unless a message is sent to it.  Then the compute() code
+     * this vertex unless a message is sent to it.  Then the compute() code
      * will be called once again until this function is called.  The
      * application finishes only when all vertices vote to halt.
      */
-    public abstract void voteToHalt();
+    public void voteToHalt() {
+        halt = true;
+    }
 
     /**
      * Is this vertex done?
      */
-    public abstract boolean isHalted();
+    public boolean isHalted() {
+        return halt;
+    }
 
     /**
      *  Get the list of incoming messages from the previous superstep.  Same as
@@ -202,13 +209,13 @@ public abstract class BasicVertex<I exte
      *
      * @return Mapper context
      */
-    public Mapper.Context getContext() {
-        return getGraphState().getContext();
-    }
-    
+     public Mapper.Context getContext() {
+         return getGraphState().getContext();
+     }
+
     /**
      * Get the worker context
-     * 
+     *
      * @return WorkerContext context
      */
     public WorkerContext getWorkerContext() {

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Tue Nov 15 00:54:20 2011
@@ -19,6 +19,7 @@
 package org.apache.giraph.graph;
 
 import org.apache.giraph.bsp.CentralizedService;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
 import org.apache.giraph.zk.ZooKeeperExt;
@@ -36,7 +37,6 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
@@ -48,8 +48,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
 import java.util.TreeMap;
 
 /**
@@ -75,15 +73,10 @@ public abstract class BspService <
     /** InputSplit reservation or finished notification and synchronization */
     private final BspEvent inputSplitsStateChanged =
         new PredicateLock();
-    /** Are the worker assignments of vertex ranges ready? */
-    private final BspEvent vertexRangeAssignmentsReadyChanged =
-        new PredicateLock();
-    /** Have the vertex range exchange children changed? */
-    private final BspEvent vertexRangeExchangeChildrenChanged =
-        new PredicateLock();
-    /** Are the vertex range exchanges done? */
-    private final BspEvent vertexRangeExchangeFinishedChanged =
+    /** Are the partition assignments to workers ready? */
+    private final BspEvent partitionAssignmentsReadyChanged =
         new PredicateLock();
+
     /** Application attempt changed */
     private final BspEvent applicationAttemptChanged =
         new PredicateLock();
@@ -117,6 +110,8 @@ public abstract class BspService <
     private final String hostname;
     /** Combination of hostname '_' partition (unique id) */
     private final String hostnamePartitionId;
+    /** Graph partitioner */
+    private final GraphPartitionerFactory<I, V, E, M> graphPartitionerFactory;
     /** Mapper that will do the graph computation */
     private final GraphMapper<I, V, E, M> graphMapper;
     /** Class logger */
@@ -125,11 +120,6 @@ public abstract class BspService <
     private final FileSystem fs;
     /** Checkpoint frequency */
     private int checkpointFrequency = -1;
-    /** Vertex range map based on the superstep below */
-    private NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
-        new TreeMap<I, VertexRange<I, V, E, M>>();
-    /** Vertex range set is based on this superstep */
-    private long vertexRangeSuperstep = UNSET_SUPERSTEP;
     /** Map of aggregators */
     private Map<String, Aggregator<Writable>> aggregatorMap =
         new TreeMap<String, Aggregator<Writable>>();
@@ -159,19 +149,17 @@ public abstract class BspService <
     public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
     public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
     public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
-    public static final String VERTEX_RANGE_ASSIGNMENTS_DIR =
-        "/_vertexRangeAssignments";
-    public static final String VERTEX_RANGE_EXCHANGE_DIR =
-        "/_vertexRangeExchangeDir";
-    public static final String VERTEX_RANGE_EXCHANGED_FINISHED_NODE =
-        "/_vertexRangeExchangeFinished";
+    public static final String PARTITION_ASSIGNMENTS_DIR =
+        "/_partitionAssignments";
+    public static final String PARTITION_EXCHANGE_DIR =
+        "/_partitionExchangeDir";
     public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
     public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
 
     public static final String JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY =
         "_aggregatorValueArrayKey";
-    public static final String JSONOBJ_VERTEX_RANGE_STAT_ARRAY_KEY =
-        "_vertexRangeStatArrayKey";
+    public static final String JSONOBJ_PARTITION_STATS_KEY =
+            "_partitionStatsKey";
     public static final String JSONOBJ_FINISHED_VERTICES_KEY =
         "_verticesFinishedKey";
     public static final String JSONOBJ_NUM_VERTICES_KEY = "_numVerticesKey";
@@ -270,31 +258,36 @@ public abstract class BspService <
     }
 
     /**
-     * Generate the worker "healthy" directory path for a superstep
+     * Generate the worker information "healthy" directory path for a
+     * superstep
      *
      * @param attempt application attempt number
      * @param superstep superstep to use
      * @return directory path based on the a superstep
      */
-    final public String getWorkerHealthyPath(long attempt, long superstep) {
+    final public String getWorkerInfoHealthyPath(long attempt,
+                                                 long superstep) {
         return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
             SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR;
     }
 
     /**
-     * Generate the worker "unhealthy" directory path for a superstep
+     * Generate the worker information "unhealthy" directory path for a
+     * superstep
      *
      * @param attempt application attempt number
      * @param superstep superstep to use
      * @return directory path based on the a superstep
      */
-    final public String getWorkerUnhealthyPath(long attempt, long superstep) {
+    final public String getWorkerInfoUnhealthyPath(long attempt,
+                                                   long superstep) {
         return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
             SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR;
     }
 
     /**
-     * Generate the worker "finished" directory path for a superstep
+     * Generate the worker "finished" directory path for a
+     * superstep
      *
      * @param attempt application attempt number
      * @param superstep superstep to use
@@ -306,44 +299,36 @@ public abstract class BspService <
     }
 
     /**
-     * Generate the "vertex range assignments" directory path for a superstep
+     * Generate the "partiton assignments" directory path for a superstep
      *
      * @param attempt application attempt number
      * @param superstep superstep to use
      * @return directory path based on the a superstep
      */
-    final public String getVertexRangeAssignmentsPath(long attempt,
-                                                      long superstep) {
+    final public String getPartitionAssignmentsPath(long attempt,
+                                                    long superstep) {
         return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
-            SUPERSTEP_DIR + "/" + superstep + VERTEX_RANGE_ASSIGNMENTS_DIR;
+            SUPERSTEP_DIR + "/" + superstep + PARTITION_ASSIGNMENTS_DIR;
     }
 
     /**
-     * Generate the "vertex range exchange" directory path for a superstep
+     * Generate the "partition exchange" directory path for a superstep
      *
      * @param attempt application attempt number
      * @param superstep superstep to use
      * @return directory path based on the a superstep
      */
-    final public String getVertexRangeExchangePath(long attempt,
-                                                   long superstep) {
+    final public String getPartitionExchangePath(long attempt,
+                                                 long superstep) {
         return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
-            SUPERSTEP_DIR + "/" + superstep + VERTEX_RANGE_EXCHANGE_DIR;
+            SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR;
     }
 
-    /**
-     * Generate the "vertex range exchange finished" directory path for
-     * a superstep
-     *
-     * @param attempt application attempt number
-     * @param superstep superstep to use
-     * @return directory path based on the a superstep
-     */
-    final public String getVertexRangeExchangeFinishedPath(long attempt,
-                                                           long superstep) {
-        return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
-            SUPERSTEP_DIR + "/" + superstep +
-            VERTEX_RANGE_EXCHANGED_FINISHED_NODE;
+    final public String getPartitionExchangeWorkerPath(long attempt,
+                                                       long superstep,
+                                                       WorkerInfo workerInfo) {
+        return getPartitionExchangePath(attempt, superstep) +
+            "/" + workerInfo.getHostnameId();
     }
 
     /**
@@ -516,17 +501,10 @@ public abstract class BspService <
         return inputSplitsStateChanged;
     }
 
-    final public BspEvent getVertexRangeAssignmentsReadyChangedEvent() {
-        return vertexRangeAssignmentsReadyChanged;
-    }
-
-    final public BspEvent getVertexRangeExchangeChildrenChangedEvent() {
-        return vertexRangeExchangeChildrenChanged;
+    final public BspEvent getPartitionAssignmentsReadyChangedEvent() {
+        return partitionAssignmentsReadyChanged;
     }
 
-    final public BspEvent getVertexRangeExchangeFinishedChangedEvent() {
-        return vertexRangeExchangeFinishedChanged;
-    }
 
     final public BspEvent getApplicationAttemptChangedEvent() {
         return applicationAttemptChanged;
@@ -597,9 +575,7 @@ public abstract class BspService <
         registerBspEvent(workerHealthRegistrationChanged);
         registerBspEvent(inputSplitsAllReadyChanged);
         registerBspEvent(inputSplitsStateChanged);
-        registerBspEvent(vertexRangeAssignmentsReadyChanged);
-        registerBspEvent(vertexRangeExchangeChildrenChanged);
-        registerBspEvent(vertexRangeExchangeFinishedChanged);
+        registerBspEvent(partitionAssignmentsReadyChanged);
         registerBspEvent(applicationAttemptChanged);
         registerBspEvent(superstepFinished);
         registerBspEvent(masterElectionChildrenChanged);
@@ -625,6 +601,9 @@ public abstract class BspService <
             throw new RuntimeException(e);
         }
         this.hostnamePartitionId = hostname + "_" + getTaskPartition();
+        this.graphPartitionerFactory =
+            BspUtils.<I, V, E, M>createGraphPartitioner(conf);
+
         this.checkpointFrequency =
             conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
                           GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT);
@@ -807,93 +786,6 @@ public abstract class BspService <
     }
 
     /**
-     * Gets the storable vertex range map, bypasses the cache.  Used by workers
-     * to dump the vertices into.
-     *
-     * @return Actual map of max vertex range indices to vertex ranges
-     */
-    public NavigableMap<I, VertexRange<I, V, E, M>>
-            getStorableVertexRangeMap() {
-        return vertexRangeMap;
-    }
-
-    /**
-     * Based on a superstep, get the mapping of vertex range maxes to vertex
-     * ranges.  This can be used to look up a particular vertex.
-     *
-     * @param superstep Superstep to get the vertex ranges for
-     * @return Cached map of max vertex range indices to vertex ranges
-     */
-    public NavigableMap<I, VertexRange<I, V, E, M>> getVertexRangeMap(
-            long superstep) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("getVertexRangeMap: Current superstep = " +
-                      getSuperstep() + ", desired superstep = " + superstep);
-        }
-
-        if (vertexRangeSuperstep == superstep) {
-            return vertexRangeMap;
-        }
-        vertexRangeSuperstep = superstep;
-        NavigableMap<I, VertexRange<I, V, E, M>> nextVertexRangeMap =
-            new TreeMap<I, VertexRange<I, V, E, M>>();
-        String vertexRangeAssignmentsPath =
-            getVertexRangeAssignmentsPath(getApplicationAttempt(),
-                                          superstep);
-        try {
-            JSONArray vertexRangeAssignmentsArray =
-                new JSONArray(
-                    new String(getZkExt().getData(vertexRangeAssignmentsPath,
-                                                  false,
-                                                  null)));
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("getVertexRangeSet: Found vertex ranges " +
-                          vertexRangeAssignmentsArray.toString() +
-                          " on superstep " + superstep);
-            }
-            for (int i = 0; i < vertexRangeAssignmentsArray.length(); ++i) {
-                JSONObject vertexRangeObj =
-                    vertexRangeAssignmentsArray.getJSONObject(i);
-                Class<I> indexClass =
-                    BspUtils.getVertexIndexClass(getConfiguration());
-                VertexRange<I, V, E, M> vertexRange =
-                    new VertexRange<I, V, E, M>(indexClass,
-                            vertexRangeObj);
-                if (nextVertexRangeMap.containsKey(vertexRange.getMaxIndex())) {
-                    throw new IllegalStateException(
-                        "getVertexRangeMap: Impossible that vertex range " +
-                        "max " + vertexRange.getMaxIndex() +
-                        " already exists!  Duplicate vertex ranges include " +
-                        nextVertexRangeMap.get(vertexRange.getMaxIndex()) +
-                        " and " + vertexRange);
-                }
-                nextVertexRangeMap.put(vertexRange.getMaxIndex(), vertexRange);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        // Copy over the vertices to the vertex ranges
-        for (Entry<I, VertexRange<I, V, E, M>> entry :
-            nextVertexRangeMap.entrySet()) {
-            if (!vertexRangeMap.containsKey(entry.getKey())) {
-                continue;
-            }
-            VertexRange<I, V, E, M> vertexRange =
-                vertexRangeMap.get(entry.getKey());
-            entry.getValue().getVertexMap().putAll(
-                vertexRange.getVertexMap());
-        }
-        vertexRangeMap = nextVertexRangeMap;
-        return vertexRangeMap;
-    }
-
-    public NavigableMap<I, VertexRange<I, V, E, M>> getCurrentVertexRangeMap()
-    {
-        return vertexRangeMap;
-    }
-
-    /**
      * Register an aggregator with name.
      *
      * @param name Name of the aggregator
@@ -948,6 +840,15 @@ public abstract class BspService <
     }
 
     /**
+     * Subclasses can use this to instantiate their respective partitioners
+     *
+     * @return Instantiated graph partitioner factory
+     */
+    protected GraphPartitionerFactory<I, V, E, M> getGraphPartitionerFactory() {
+        return graphPartitionerFactory;
+    }
+
+    /**
      * Derived classes that want additional ZooKeeper events to take action
      * should override this.
      *
@@ -982,6 +883,8 @@ public abstract class BspService <
                     LOG.info("process: Asynchronous connection complete.");
                 }
                 connectedEvent.signal();
+            } else {
+                LOG.warn("process: Got unknown null path event " + event);
             }
             return;
         }
@@ -1025,30 +928,13 @@ public abstract class BspService <
             }
             inputSplitsStateChanged.signal();
             eventProcessed = true;
-        } else if (event.getPath().contains(VERTEX_RANGE_ASSIGNMENTS_DIR) &&
-                event.getType() == EventType.NodeCreated) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("process: vertexRangeAssignmentsReadyChanged " +
-                         "(vertex ranges are assigned)");
-            }
-            vertexRangeAssignmentsReadyChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().contains(VERTEX_RANGE_EXCHANGE_DIR) &&
-                event.getType() == EventType.NodeChildrenChanged) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("process: vertexRangeExchangeChildrenChanged " +
-                         "(ready to exchanged vertex ranges)");
-            }
-            vertexRangeExchangeChildrenChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().contains(
-                VERTEX_RANGE_EXCHANGED_FINISHED_NODE) &&
+        } else if (event.getPath().contains(PARTITION_ASSIGNMENTS_DIR) &&
                 event.getType() == EventType.NodeCreated) {
             if (LOG.isInfoEnabled()) {
-                LOG.info("process: vertexRangeExchangeFinishedChanged " +
-                         "(vertex range exchange done)");
+                LOG.info("process: partitionAssignmentsReadyChanged " +
+                         "(partitions are assigned)");
             }
-            vertexRangeExchangeFinishedChanged.signal();
+            partitionAssignmentsReadyChanged.signal();
             eventProcessed = true;
         } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
                 event.getType() == EventType.NodeCreated) {



Mime
View raw message