incubator-giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1201987 [4/5] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/...
Date Tue, 15 Nov 2011 00:54:22 GMT
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Tue Nov 15 00:54:20 2011
@@ -19,9 +19,9 @@
 package org.apache.giraph.graph;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.RPCCommunications;
-import org.apache.giraph.comm.ServerInterface;
-import org.apache.giraph.comm.WorkerCommunications;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.zk.ZooKeeperManager;
 import org.apache.hadoop.conf.Configuration;
@@ -36,10 +36,11 @@ import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.URL;
 import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 /**
  * This mapper that will execute the BSP graph tasks.  Since this mapper will
@@ -56,8 +57,6 @@ public class GraphMapper<I extends Writa
     CentralizedServiceWorker<I, V, E, M> serviceWorker;
     /** Coordination service master thread */
     Thread masterThread = null;
-    /** Communication service */
-    private ServerInterface<I, V, E, M> commService = null;
     /** The map should be run exactly once, or else there is a problem. */
     boolean mapAlreadyRun = false;
     /** Manages the ZooKeeper servers if necessary (dynamic startup) */
@@ -92,16 +91,6 @@ public class GraphMapper<I extends Writa
     }
 
     /**
-     * Get the worker communications, a subset of the functionality.
-     *
-     * @return worker communication object
-     */
-    public final WorkerCommunications<I, V, E, M>
-            getWorkerCommunications() {
-        return commService;
-    }
-
-    /**
      * Get the aggregator usage, a subset of the functionality
      *
      * @return Aggregator usage interface
@@ -109,7 +98,7 @@ public class GraphMapper<I extends Writa
     public final AggregatorUsage getAggregatorUsage() {
         return serviceWorker;
     }
-    
+
     public final WorkerContext getWorkerContext() {
     	return serviceWorker.getWorkerContext();
     }
@@ -260,40 +249,6 @@ public class GraphMapper<I extends Writa
                     ", vertex output format - " + classList.get(2));
             }
         }
-        // Vertex range balancer might never select the types
-        Class<? extends VertexRangeBalancer<I, V, E, M>>
-            vertexRangeBalancerClass =
-                BspUtils.<I, V, E, M>getVertexRangeBalancerClass(conf);
-        classList = ReflectionUtils.<VertexRangeBalancer>getTypeArguments(
-            VertexRangeBalancer.class, vertexRangeBalancerClass);
-        if (classList.get(0) != null &&
-                !vertexIndexType.equals(classList.get(0))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Vertex index types don't match, " +
-                "vertex - " + vertexIndexType +
-                ", vertex range balancer - " + classList.get(0));
-        }
-        if (classList.get(1) != null &&
-                !vertexValueType.equals(classList.get(1))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Vertex value types don't match, " +
-                "vertex - " + vertexValueType +
-                ", vertex range balancer - " + classList.get(1));
-        }
-        if (classList.get(2) != null &&
-                !edgeValueType.equals(classList.get(2))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Edge value types don't match, " +
-                "vertex - " + edgeValueType +
-                ", vertex range balancer - " + classList.get(2));
-        }
-        if (classList.get(3) != null &&
-                !messageValueType.equals(classList.get(3))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Message value types don't match, " +
-                "vertex - " + edgeValueType +
-                ", vertex range balancer - " + classList.get(3));
-        }
         // Vertex resolver might never select the types
         Class<? extends VertexResolver<I, V, E, M>>
             vertexResolverClass =
@@ -487,7 +442,11 @@ public class GraphMapper<I extends Writa
                     LOG.info("setup: Starting up BspServiceWorker...");
                 }
                 serviceWorker = new BspServiceWorker<I, V, E, M>(
-                    serverPortList, sessionMsecTimeout, context, this);
+                    serverPortList,
+                    sessionMsecTimeout,
+                    context,
+                    this,
+                    graphState);
                 if (LOG.isInfoEnabled()) {
                     LOG.info("setup: Registering health of this worker...");
                 }
@@ -517,8 +476,7 @@ public class GraphMapper<I extends Writa
         if (done == true) {
             return;
         }
-        if ((serviceWorker != null) &&
-                (serviceWorker.getTotalVertices() == 0)) {
+        if ((serviceWorker != null) && (graphState.getNumVertices() == 0)) {
             return;
         }
 
@@ -536,9 +494,8 @@ public class GraphMapper<I extends Writa
         }
         mapAlreadyRun = true;
 
-        graphState.setSuperstep(serviceWorker.getSuperstep()).setContext(context)
-                  .setGraphMapper(this).setNumEdges(serviceWorker.getTotalEdges())
-                  .setNumVertices(serviceWorker.getTotalVertices());
+        graphState.setSuperstep(serviceWorker.getSuperstep()).
+            setContext(context).setGraphMapper(this);
 
         try {
             serviceWorker.getWorkerContext().preApplication();
@@ -553,22 +510,16 @@ public class GraphMapper<I extends Writa
         }
         context.progress();
 
-        long workerFinishedVertices = 0;
-        long workerVertices = 0;
-        long workerEdges = 0;
+        List<PartitionStats> partitionStatsList =
+            new ArrayList<PartitionStats>();
         long workerSentMessages = 0;
         do {
             long superstep = serviceWorker.getSuperstep();
 
-            graphState.setSuperstep(superstep)
-                      .setNumEdges(serviceWorker.getTotalEdges())
-                      .setNumVertices(serviceWorker.getTotalVertices());
+            graphState.setSuperstep(superstep);
 
-            if (commService != null) {
-                commService.prepareSuperstep();
-            }
-
-            serviceWorker.startSuperstep();
+            Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
+                serviceWorker.startSuperstep();
             if (zkManager != null && zkManager.runsZooKeeper()) {
                 if (LOG.isInfoEnabled()) {
                     LOG.info("map: Chosen to run ZooKeeper...");
@@ -582,16 +533,10 @@ public class GraphMapper<I extends Writa
                           " maxMem=" + Runtime.getRuntime().maxMemory() +
                           " freeMem=" + Runtime.getRuntime().freeMemory());
             }
-            if ((superstep >= BspService.INPUT_SUPERSTEP) &&
-                    (commService == null)) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("map: Starting communication service on " +
-                             "superstep " + superstep);
-                }
-                commService =
-                    new RPCCommunications<I, V, E, M>(context,
-                                                      serviceWorker);
-            }
+            context.progress();
+
+            serviceWorker.exchangeVertexPartitions(
+                masterAssignedPartitionOwners);
             context.progress();
 
             // Might need to restart from another superstep
@@ -606,51 +551,38 @@ public class GraphMapper<I extends Writa
                 serviceWorker.storeCheckpoint();
             }
 
-            serviceWorker.exchangeVertexRanges();
-            context.progress();
-
             serviceWorker.getWorkerContext().setGraphState(graphState);
             serviceWorker.getWorkerContext().preSuperstep();
             context.progress();
 
-            workerFinishedVertices = 0;
-            workerVertices = 0;
-            workerEdges = 0;
+            partitionStatsList.clear();
             workerSentMessages = 0;
-            for (Map.Entry<I, VertexRange<I, V, E, M>> entry :
-                serviceWorker.getVertexRangeMap().entrySet()) {
-                // Only report my own vertex range stats
-                if (!entry.getValue().getHostname().equals(
-                        serviceWorker.getHostname()) ||
-                        (entry.getValue().getPort() !=
-                        serviceWorker.getPort())) {
-                    continue;
-                }
-
-                for (BasicVertex<I, V, E, M> vertex :
-                        entry.getValue().getVertexMap().values()) {
+            for (Partition<I, V, E, M> partition :
+                    serviceWorker.getPartitionMap().values()) {
+                PartitionStats partitionStats =
+                    new PartitionStats(partition.getPartitionId(), 0, 0, 0);
+                for (BasicVertex<I, V, E, M> basicVertex :
+                        partition.getVertices()) {
                     // Make sure every vertex has the current
                     // graphState before computing
-
-                    vertex.setGraphState(graphState);
-                    if (vertex.isHalted() && !vertex.getMsgList().isEmpty()) {
-                      // TODO FIXME: if this is not a subclass of Vertex, this will blow up!
-                        Vertex<I, V, E, M> activatedVertex =
-                            (Vertex<I, V, E, M>) vertex;
-                        activatedVertex.halt = false;
+                    basicVertex.setGraphState(graphState);
+                    if (basicVertex.isHalted() &&
+                            !basicVertex.getMsgList().isEmpty()) {
+                        basicVertex.halt = false;
                     }
-                    if (!vertex.isHalted()) {
+                    if (!basicVertex.isHalted()) {
                         Iterator<M> vertexMsgIt =
-                            vertex.getMsgList().iterator();
+                            basicVertex.getMsgList().iterator();
                         context.progress();
-                        vertex.compute(vertexMsgIt);
+                        basicVertex.compute(vertexMsgIt);
                     }
-                    if (vertex.isHalted()) {
-                        ++workerFinishedVertices;
+                    if (basicVertex.isHalted()) {
+                        partitionStats.incrFinishedVertexCount();
                     }
-                    ++workerVertices;
-                    workerEdges += vertex.getNumOutEdges();
+                    partitionStats.incrVertexCount();
+                    partitionStats.addEdgeCount(basicVertex.getNumOutEdges());
                 }
+                partitionStatsList.add(partitionStats);
             }
 
             serviceWorker.getWorkerContext().postSuperstep();
@@ -661,10 +593,7 @@ public class GraphMapper<I extends Writa
                          " maxMem=" + Runtime.getRuntime().maxMemory() +
                          " freeMem=" + Runtime.getRuntime().freeMemory());
             }
-            workerSentMessages = commService.flush(context);
-        } while (!serviceWorker.finishSuperstep(workerFinishedVertices,
-                                                workerVertices,
-                                                workerEdges,
+        } while (!serviceWorker.finishSuperstep(partitionStatsList,
                                                 workerSentMessages));
         if (LOG.isInfoEnabled()) {
             LOG.info("map: BSP application done " +
@@ -685,9 +614,6 @@ public class GraphMapper<I extends Writa
             return;
         }
 
-        if (commService != null) {
-            commService.closeConnections();
-        }
         if (serviceWorker != null) {
             serviceWorker.cleanup();
         }
@@ -703,11 +629,5 @@ public class GraphMapper<I extends Writa
             zkManager.offlineZooKeeperServers(
                 ZooKeeperManager.State.FINISHED);
         }
-        // Preferably would shut down the service only after
-        // all clients have disconnected (or the exceptions on the
-        // client side ignored).
-        if (commService != null) {
-            commService.close();
-        }
     }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Tue Nov 15 00:54:20 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.giraph.graph;
 
+import org.apache.giraph.comm.WorkerCommunications;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -40,9 +41,11 @@ public class GraphState<I extends Writab
     /** Graph-wide number of edges */
     private long numEdges = -1;
     /** Graph-wide map context */
-    private Mapper.Context context = null;
+    private Mapper.Context context;
     /** Graph-wide BSP Mapper for this Vertex */
-    private GraphMapper<I, V, E, M> graphMapper = null;
+    private GraphMapper<I, V, E, M> graphMapper;
+    /** Graph-wide worker communications */
+    private WorkerCommunications<I, V, E, M> workerCommunications;
 
     public long getSuperstep() {
         return superstep;
@@ -75,7 +78,7 @@ public class GraphState<I extends Writab
         return context;
     }
 
-    public GraphState<I, V , E ,M> setContext(Mapper.Context context) {
+    public GraphState<I, V, E ,M> setContext(Mapper.Context context) {
         this.context = context;
         return this;
     }
@@ -89,4 +92,14 @@ public class GraphState<I extends Writab
         this.graphMapper = graphMapper;
         return this;
     }
+
+    public GraphState<I, V, E, M> setWorkerCommunications(
+            WorkerCommunications<I, V, E, M> workerCommunications) {
+        this.workerCommunications = workerCommunications;
+        return this;
+    }
+
+    public WorkerCommunications<I, V, E, M> getWorkerCommunications() {
+        return workerCommunications;
+    }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Tue Nov 15 00:54:20 2011
@@ -42,8 +42,6 @@ public abstract class LongDoubleFloatDou
     private double vertexValue;
     private OpenLongFloatHashMap verticesWithEdgeValues = new OpenLongFloatHashMap();
     private DoubleArrayList messageList = new DoubleArrayList();
-    /** If true, do not do anymore computation on this vertex. */
-    boolean halt = false;
 
     @Override
     public void initialize(LongWritable vertexIdW, DoubleWritable vertexValueW,
@@ -118,7 +116,7 @@ public abstract class LongDoubleFloatDou
             throw new IllegalArgumentException(
                     "sendMsg: Cannot send null message to " + id);
         }
-        getGraphState().getGraphMapper().getWorkerCommunications().sendMessageReq(id, msg);
+        getGraphState().getWorkerCommunications().sendMessageReq(id, msg);
     }
 
     @Override
@@ -133,17 +131,10 @@ public abstract class LongDoubleFloatDou
             sendMsg(destVertex, msg);
         }
     }
- //   @Override
-  //  public MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> instantiateVertex() {
-  //      LongDoubleFloatDoubleVertex vertex = (LongDoubleFloatDoubleVertex)
-  //          BspUtils.<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>createVertex(
-  //              getGraphState().getContext().getConfiguration());
-  //      return vertex;
-  //  }
 
     @Override
     public long getNumVertices() {
-      System.out.println("in getNumVertices!");
+        System.out.println("in getNumVertices!");
         return getGraphState().getNumVertices();
     }
 
@@ -154,28 +145,30 @@ public abstract class LongDoubleFloatDou
 
     @Override
     public Iterator<LongWritable> iterator() {
-      final long[] destVertices = verticesWithEdgeValues.keys().elements();
-      final LongWritable lw = new LongWritable();
-      return new Iterator<LongWritable>() {
-        int offset = 0;
-        @Override public boolean hasNext() {
-          return offset < destVertices.length;
-        }
-
-        @Override public LongWritable next() {
-          lw.set(destVertices[offset++]);
-          return lw;
-        }
-
-        @Override public void remove() {
-          throw new UnsupportedOperationException("Mutation disallowed for edge list via iterator");
-        }
-      };
+        final long[] destVertices = verticesWithEdgeValues.keys().elements();
+        final LongWritable lw = new LongWritable();
+        return new Iterator<LongWritable>() {
+            int offset = 0;
+            @Override public boolean hasNext() {
+                return offset < destVertices.length;
+            }
+
+            @Override public LongWritable next() {
+                lw.set(destVertices[offset++]);
+                return lw;
+            }
+
+            @Override public void remove() {
+                throw new UnsupportedOperationException(
+                        "Mutation disallowed for edge list via iterator");
+            }
+        };
     }
 
     @Override
     public FloatWritable getEdgeValue(LongWritable targetVertexId) {
-        return new FloatWritable(verticesWithEdgeValues.get(targetVertexId.get()));
+        return new FloatWritable(
+            verticesWithEdgeValues.get(targetVertexId.get()));
     }
 
     @Override
@@ -196,34 +189,26 @@ public abstract class LongDoubleFloatDou
     @Override
     public void addVertexRequest(MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> vertex)
             throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().addVertexReq(vertex);
+        getGraphState().getWorkerCommunications().addVertexReq(vertex);
     }
 
     @Override
     public void removeVertexRequest(LongWritable vertexId) throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().removeVertexReq(vertexId);
+        getGraphState().getWorkerCommunications().removeVertexReq(vertexId);
     }
 
     @Override
     public void addEdgeRequest(LongWritable vertexIndex,
-                               Edge<LongWritable, FloatWritable> edge) throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().addEdgeReq(vertexIndex, edge);
+                               Edge<LongWritable, FloatWritable> edge)
+                               throws IOException {
+        getGraphState().getWorkerCommunications().addEdgeReq(vertexIndex, edge);
     }
 
     @Override
     public void removeEdgeRequest(LongWritable sourceVertexId,
                                   LongWritable destVertexId) throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().removeEdgeReq(sourceVertexId, destVertexId);
-    }
-
-    @Override
-    public final void voteToHalt() {
-        halt = true;
-    }
-
-    @Override
-    public final boolean isHalted() {
-        return halt;
+        getGraphState().getWorkerCommunications().removeEdgeReq(
+            sourceVertexId, destVertexId);
     }
 
     @Override

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Tue Nov 15 00:54:20 2011
@@ -77,7 +77,7 @@ public abstract class MutableVertex<I ex
      */
     public void addVertexRequest(MutableVertex<I, V, E, M> vertex)
             throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().
+        getGraphState().getWorkerCommunications().
         addVertexReq(vertex);
     }
 
@@ -88,7 +88,7 @@ public abstract class MutableVertex<I ex
      * @param vertexId Id of the vertex to be removed.
      */
     public void removeVertexRequest(I vertexId) throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().
+        getGraphState().getWorkerCommunications().
         removeVertexReq(vertexId);
     }
 
@@ -101,7 +101,7 @@ public abstract class MutableVertex<I ex
      */
     public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
             throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().
+        getGraphState().getWorkerCommunications().
             addEdgeReq(sourceVertexId, edge);
     }
 
@@ -114,7 +114,7 @@ public abstract class MutableVertex<I ex
      */
     public void removeEdgeRequest(I sourceVertexId, I destVertexId)
             throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().
+        getGraphState().getWorkerCommunications().
             removeEdgeReq(sourceVertexId, destVertexId);
     }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Tue Nov 15 00:54:20 2011
@@ -26,11 +26,10 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
 
 /**
  * User applications often subclass {@link Vertex}, which stores the outbound
@@ -56,10 +55,8 @@ public abstract class Vertex<I extends W
     /** Vertex value */
     private V vertexValue = null;
     /** Map of destination vertices and their edge values */
-    protected final SortedMap<I, Edge<I, E>> destEdgeMap =
-        new TreeMap<I, Edge<I, E>>();
-    /** If true, do not do anymore computation on this vertex. */
-    boolean halt = false;
+    protected final Map<I, Edge<I, E>> destEdgeMap =
+        new HashMap<I, Edge<I, E>>();
     /** List of incoming messages from the previous superstep */
     private final List<M> msgList = new ArrayList<M>();
 
@@ -69,12 +66,14 @@ public abstract class Vertex<I extends W
             setVertexId(vertexId);
         }
         if (vertexValue != null) {
-          setVertexValue(vertexValue);
+            setVertexValue(vertexValue);
         }
         if (edges != null && !edges.isEmpty()) {
-          for(Map.Entry<I, E> entry : edges.entrySet()) {
-            destEdgeMap.put(entry.getKey(), new Edge<I, E>(entry.getKey(), entry.getValue()));
-          }
+            for (Map.Entry<I, E> entry : edges.entrySet()) {
+                destEdgeMap.put(
+                    entry.getKey(),
+                    new Edge<I, E>(entry.getKey(), entry.getValue()));
+            }
         }
         if (messages != null && !messages.isEmpty()) {
             msgList.addAll(messages);
@@ -174,41 +173,31 @@ public abstract class Vertex<I extends W
     @Override
     public void addVertexRequest(MutableVertex<I, V, E, M> vertex)
             throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().
+        getGraphState().getWorkerCommunications().
             addVertexReq(vertex);
     }
 
     @Override
     public void removeVertexRequest(I vertexId) throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().
+        getGraphState().getWorkerCommunications().
             removeVertexReq(vertexId);
     }
 
     @Override
     public void addEdgeRequest(I vertexIndex,
                                Edge<I, E> edge) throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().
+        getGraphState().getWorkerCommunications().
             addEdgeReq(vertexIndex, edge);
     }
 
     @Override
     public void removeEdgeRequest(I sourceVertexId,
                                   I destVertexId) throws IOException {
-        getGraphState().getGraphMapper().getWorkerCommunications().
+        getGraphState().getWorkerCommunications().
             removeEdgeReq(sourceVertexId, destVertexId);
     }
 
     @Override
-    public final void voteToHalt() {
-        halt = true;
-    }
-
-    @Override
-    public final boolean isHalted() {
-        return halt;
-    }
-
-    @Override
     final public void readFields(DataInput in) throws IOException {
         vertexId = BspUtils.<I>createVertexIndex(getConf());
         vertexId.readFields(in);

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+/**
+ * Simple immutable structure for storing a final vertex and edge count.
+ */
+public class VertexEdgeCount {
+    /** Immutable vertices */
+    private final long vertexCount;
+    /** Immutable edges */
+    private final long edgeCount;
+
+    public VertexEdgeCount() {
+        vertexCount = 0;
+        edgeCount = 0;
+    }
+
+    public VertexEdgeCount(long vertexCount, long edgeCount) {
+        this.vertexCount = vertexCount;
+        this.edgeCount = edgeCount;
+    }
+
+    public long getVertexCount() {
+        return vertexCount;
+    }
+
+    public long getEdgeCount() {
+        return edgeCount;
+    }
+
+    public VertexEdgeCount incrVertexEdgeCount(
+            VertexEdgeCount vertexEdgeCount) {
+        return new VertexEdgeCount(
+            vertexCount + vertexEdgeCount.getVertexCount(),
+            edgeCount + vertexEdgeCount.getEdgeCount());
+    }
+
+    public VertexEdgeCount incrVertexEdgeCount(
+            long vertexCount, long edgeCount) {
+        return new VertexEdgeCount(
+            this.vertexCount + vertexCount,
+            this.edgeCount + edgeCount);
+    }
+
+    @Override
+    public String toString() {
+        return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + ")";
+    }
+}

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Information about a worker that is sent to the master and other workers.
+ */
+public class WorkerInfo implements Writable {
+    /** Worker hostname */
+    private String hostname;
+    /** Partition id of this worker */
+    private int partitionId = -1;
+    /** Port that the RPC server is using */
+    private int port = -1;
+    /** Hostname + "_" + id for easier debugging */
+    private String hostnameId;
+
+    /**
+     * Constructor for reflection
+     */
+    public WorkerInfo() {
+    }
+
+    public WorkerInfo(String hostname, int partitionId, int port) {
+        this.hostname = hostname;
+        this.partitionId = partitionId;
+        this.port = port;
+        this.hostnameId = hostname + "_" + partitionId;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public String getHostnameId() {
+        return hostnameId;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other instanceof WorkerInfo) {
+            WorkerInfo workerInfo = (WorkerInfo) other;
+            if (hostname.equals(workerInfo.getHostname()) &&
+                    (partitionId == workerInfo.getPartitionId()) &&
+                    (port == workerInfo.getPort())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = 17;
+        result = 37 * result + port;
+        result = 37 * result + hostname.hashCode();
+        result = 37 * result + partitionId;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "Worker(hostname=" + hostname + ", MRpartition=" +
+            partitionId + ", port=" + port + ")";
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        hostname = input.readUTF();
+        partitionId = input.readInt();
+        port = input.readInt();
+        hostnameId = hostname + "_" + partitionId;
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.writeUTF(hostname);
+        output.writeInt(partitionId);
+        output.writeInt(port);
+    }
+}

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Basic partition owner, can be subclassed for more complicated partition
+ * owner implementations.
+ */
+public class BasicPartitionOwner implements PartitionOwner, Configurable {
+    /** Configuration */
+    private Configuration conf;
+    /** Partition id */
+    private int partitionId = -1;
+    /** Owning worker information */
+    private WorkerInfo workerInfo;
+    /** Previous (if any) worker info */
+    private WorkerInfo previousWorkerInfo;
+    /** Checkpoint files prefix for this partition */
+    private String checkpointFilesPrefix;
+
+    public BasicPartitionOwner() {
+    }
+
+    public BasicPartitionOwner(int partitionId, WorkerInfo workerInfo) {
+        this(partitionId, workerInfo, null, null);
+    }
+
+    public BasicPartitionOwner(int partitionId,
+                               WorkerInfo workerInfo,
+                               WorkerInfo previousWorkerInfo,
+                               String checkpointFilesPrefix) {
+        this.partitionId = partitionId;
+        this.workerInfo = workerInfo;
+        this.previousWorkerInfo = previousWorkerInfo;
+        this.checkpointFilesPrefix = checkpointFilesPrefix;
+    }
+
+    @Override
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    @Override
+    public WorkerInfo getWorkerInfo() {
+        return workerInfo;
+    }
+
+    @Override
+    public void setWorkerInfo(WorkerInfo workerInfo) {
+        this.workerInfo = workerInfo;
+    }
+
+    @Override
+    public WorkerInfo getPreviousWorkerInfo() {
+        return previousWorkerInfo;
+    }
+
+    @Override
+    public void setPreviousWorkerInfo(WorkerInfo workerInfo) {
+        this.previousWorkerInfo = workerInfo;
+    }
+
+    @Override
+    public String getCheckpointFilesPrefix() {
+        return checkpointFilesPrefix;
+    }
+
+    @Override
+    public void setCheckpointFilesPrefix(String checkpointFilesPrefix) {
+        this.checkpointFilesPrefix = checkpointFilesPrefix;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        partitionId = input.readInt();
+        workerInfo = new WorkerInfo();
+        workerInfo.readFields(input);
+        boolean hasPreviousWorkerInfo = input.readBoolean();
+        if (hasPreviousWorkerInfo) {
+            previousWorkerInfo = new WorkerInfo();
+            previousWorkerInfo.readFields(input);
+        }
+        boolean hasCheckpointFilePrefix = input.readBoolean();
+        if (hasCheckpointFilePrefix) {
+            checkpointFilesPrefix = input.readUTF();
+        }
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.writeInt(partitionId);
+        workerInfo.write(output);
+        if (previousWorkerInfo != null) {
+            output.writeBoolean(true);
+            previousWorkerInfo.write(output);
+        } else {
+            output.writeBoolean(false);
+        }
+        if (checkpointFilesPrefix != null) {
+            output.writeBoolean(true);
+            output.writeUTF(checkpointFilesPrefix);
+        } else {
+            output.writeBoolean(false);
+        }
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public String toString() {
+        return "(id=" + partitionId + ",cur=" + workerInfo + ",prev=" +
+               previousWorkerInfo + ",ckpt_file=" + checkpointFilesPrefix + ")";
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Defines the partitioning framework for this application.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public interface GraphPartitionerFactory<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable> {
+    /**
+     * Create the {@link MasterGraphPartitioner} used by the master.
+     * Instantiated once by the master and reused.
+     *
+     * @return Instantiated master graph partitioner
+     */
+    MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
+
+    /**
+     * Create the {@link WorkerGraphPartitioner} used by the worker.
+     * Instantiated once by every worker and reused.
+     *
+     * @return Instantiated worker graph partitioner
+     */
+    WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Master will execute a hash based partitioning.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashMasterPartitioner<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable> implements
+        MasterGraphPartitioner<I, V, E, M> {
+    /** Provided configuration */
+    private Configuration conf;
+    /** Specified partition count (overrides calculation) */
+    private final int userPartitionCount;
+    /** Partition count (calculated in createInitialPartitionOwners) */
+    private int partitionCount = -1;
+    /** Save the last generated partition owner list */
+    private List<PartitionOwner> partitionOwnerList;
+    /** Class logger */
+    private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
+
+    /**
+     * ZooKeeper has a limit of the data in a single znode of 1 MB and
+     * each entry can go be on the average somewhat more than 300 bytes
+     */
+    private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
+
+    /**
+     * Multiplier for the current workers squared
+     */
+    public static final String PARTITION_COUNT_MULTIPLIER =
+        "hash.masterPartitionCountMultipler";
+    public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 0.5f;
+
+    /** Overrides default partition count calculation if not -1 */
+    public static final String USER_PARTITION_COUNT =
+        "hash.userPartitionCount";
+    public static final int DEFAULT_USER_PARTITION_COUNT = -1;
+
+    public HashMasterPartitioner(Configuration conf) {
+        this.conf = conf;
+        userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
+                                         DEFAULT_USER_PARTITION_COUNT);
+    }
+
+    @Override
+    public Collection<PartitionOwner> createInitialPartitionOwners(
+            Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
+        if (availableWorkerInfos.isEmpty()) {
+            throw new IllegalArgumentException(
+                "createInitialPartitionOwners: No available workers");
+        }
+        List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
+        Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
+        if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
+            float multiplier = conf.getFloat(
+                PARTITION_COUNT_MULTIPLIER,
+                DEFAULT_PARTITION_COUNT_MULTIPLIER);
+            partitionCount =
+                Math.max((int) (multiplier * availableWorkerInfos.size() *
+                         availableWorkerInfos.size()),
+                         1);
+        } else {
+            partitionCount = userPartitionCount;
+        }
+        if (LOG.isInfoEnabled()) {
+            LOG.info("createInitialPartitionOwners: Creating " +
+                     partitionCount + " instead of " +
+                     (availableWorkerInfos.size() *
+                      availableWorkerInfos.size()) + " partitions.");
+        }
+        if (partitionCount > MAX_PARTTIONS) {
+            LOG.warn("createInitialPartitionOwners: " +
+                    "Reducing the partitionCount to " + MAX_PARTTIONS +
+                    " from " + partitionCount);
+            partitionCount = MAX_PARTTIONS;
+        }
+
+        for (int i = 0; i < partitionCount; ++i) {
+            PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
+            if (!workerIt.hasNext()) {
+                workerIt = availableWorkerInfos.iterator();
+            }
+            ownerList.add(owner);
+        }
+        this.partitionOwnerList = ownerList;
+        return ownerList;
+    }
+
+
+    @Override
+    public Collection<PartitionOwner> getCurrentPartitionOwners() {
+        return partitionOwnerList;
+    }
+
+    /**
+     * Subclasses can set the partition owner list.
+     *
+     * @param partitionOwnerList New partition owner list.
+     */
+    protected void setPartitionOwnerList(List<PartitionOwner>
+            partitionOwnerList) {
+        this.partitionOwnerList = partitionOwnerList;
+    }
+
+    @Override
+    public Collection<PartitionOwner> generateChangedPartitionOwners(
+            Collection<PartitionStats> allPartitionStatsList,
+            Collection<WorkerInfo> availableWorkerInfos,
+            int maxWorkers,
+            long superstep) {
+        return PartitionBalancer.balancePartitionsAcrossWorkers(
+            conf,
+            partitionOwnerList,
+            allPartitionStatsList,
+            availableWorkerInfos);
+    }
+
+    @Override
+    public PartitionStats createPartitionStats() {
+        return new PartitionStats();
+    }
+
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Divides the vertices into partitions by their hash code using a simple
+ * round-robin hash for great balancing if given a random hash code.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashPartitionerFactory<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable>
+        implements Configurable,
+        GraphPartitionerFactory<I, V, E, M> {
+    private Configuration conf;
+
+    @Override
+    public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
+        return new HashMasterPartitioner<I, V, E, M>(getConf());
+    }
+
+    @Override
+    public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
+        return new HashWorkerPartitioner<I, V, E, M>();
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Divides the vertices into partitions by their hash code using ranges of the
+ * hash space.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashRangePartitionerFactory<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable>
+        implements Configurable, GraphPartitionerFactory<I, V, E, M> {
+    private Configuration conf;
+
+    @Override
+    public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
+        return new HashMasterPartitioner<I, V, E, M>(getConf());
+    }
+
+    @Override
+    public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
+        return new HashRangeWorkerPartitioner<I, V, E, M>();
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Implements range-based partitioning from the id hash code.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashRangeWorkerPartitioner<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable>
+        extends HashWorkerPartitioner<I, V, E, M> {
+    @Override
+    public PartitionOwner getPartitionOwner(I vertexId) {
+        int rangeSize = Integer.MAX_VALUE / getPartitionOwners().size();
+        int index = Math.abs(vertexId.hashCode()) / rangeSize;
+        return partitionOwnerList.get(index);
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Implements hash-based partitioning from the id hash code.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashWorkerPartitioner<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable>
+        implements WorkerGraphPartitioner<I, V, E, M> {
+    /** Mapping of the vertex ids to {@link PartitionOwner} */
+    protected List<PartitionOwner> partitionOwnerList =
+        new ArrayList<PartitionOwner>();
+
+    @Override
+    public PartitionOwner createPartitionOwner() {
+        return new BasicPartitionOwner();
+    }
+
+    @Override
+    public PartitionOwner getPartitionOwner(I vertexId) {
+        return partitionOwnerList.get(Math.abs(vertexId.hashCode())
+                % partitionOwnerList.size());
+    }
+
+    @Override
+    public Collection<PartitionStats> finalizePartitionStats(
+            Collection<PartitionStats> workerPartitionStats,
+            Map<Integer, Partition<I, V, E, M>> partitionMap) {
+        // No modification necessary
+        return workerPartitionStats;
+    }
+
+    @Override
+    public PartitionExchange updatePartitionOwners(
+            WorkerInfo myWorkerInfo,
+            Collection<? extends PartitionOwner> masterSetPartitionOwners,
+            Map<Integer, Partition<I, V, E, M>> partitionMap) {
+        partitionOwnerList.clear();
+        partitionOwnerList.addAll(masterSetPartitionOwners);
+
+        Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
+        Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
+            new HashMap<WorkerInfo, List<Integer>>();
+        for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+            if (partitionOwner.getPreviousWorkerInfo() == null) {
+                continue;
+            } else if (partitionOwner.getWorkerInfo().equals(
+                       myWorkerInfo) &&
+                       partitionOwner.getPreviousWorkerInfo().equals(
+                       myWorkerInfo)) {
+                throw new IllegalStateException(
+                    "updatePartitionOwners: Impossible to have the same " +
+                    "previous and current worker info " + partitionOwner +
+                    " as me " + myWorkerInfo);
+            } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
+                dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
+            } else if (partitionOwner.getPreviousWorkerInfo().equals(
+                    myWorkerInfo)) {
+                if (workerPartitionOwnerMap.containsKey(
+                        partitionOwner.getWorkerInfo())) {
+                    workerPartitionOwnerMap.get(
+                        partitionOwner.getWorkerInfo()).add(
+                            partitionOwner.getPartitionId());
+                } else {
+                    List<Integer> partitionOwnerList = new ArrayList<Integer>();
+                    partitionOwnerList.add(partitionOwner.getPartitionId());
+                    workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
+                                                partitionOwnerList);
+                }
+            }
+        }
+
+        return new PartitionExchange(dependentWorkerSet,
+                                     workerPartitionOwnerMap);
+    }
+
+    @Override
+    public Collection<? extends PartitionOwner> getPartitionOwners() {
+        return partitionOwnerList;
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.giraph.graph.WorkerInfo;
+
+/**
+ * Determines how to divide the graph into partitions, how to manipulate
+ * partitions and then how to assign those partitions to workers.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public interface MasterGraphPartitioner<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable> {
+    /**
+     * Set some initial partition owners for the graph. Guaranteed to be called
+     * prior to the graph being loaded (initial or restart).
+     *
+     * @param availableWorkerInfos Workers available for partition assignment
+     * @param maxWorkers Maximum number of workers
+     */
+    Collection<PartitionOwner> createInitialPartitionOwners(
+            Collection<WorkerInfo> availableWorkerInfos, int maxWorkers);
+
+    /**
+     * After the worker stats have been merged to a single list, the master can
+     * use this information to send commands to the workers for any
+     * {@link Partition} changes. This protocol is specific to the
+     * {@link GraphPartitioner} implementation.
+     *
+     * @param allPartitionStatsList All partition stats from all workers.
+     * @param availableWorkerInfos Workers available for partition assignment
+     * @param maxWorkers Maximum number of workers
+     * @param superstep Partition owners will be set for this superstep
+     * @return Collection of {@link PartitionOwner} objects that changed from
+     *         the previous superstep, empty list if no change.
+     */
+    Collection<PartitionOwner> generateChangedPartitionOwners(
+            Collection<PartitionStats> allPartitionStatsList,
+            Collection<WorkerInfo> availableWorkers,
+            int maxWorkers,
+            long superstep);
+
+    /**
+     * Get current partition owners at this time.
+     *
+     * @return Collection of current {@link PartitionOwner} objects
+     */
+    Collection<PartitionOwner> getCurrentPartitionOwners();
+
+    /**
+     * Instantiate the {@link PartitionStats} implementation used to read the
+     * worker stats
+     *
+     * @return Instantiated {@link PartitionStats} object
+     */
+    PartitionStats createPartitionStats();
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * A generic container that stores vertices.  Vertex ids will map to exactly
+ * one partition.
+ */
+@SuppressWarnings("rawtypes")
+public class Partition<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable>
+        implements Writable {
+    /** Configuration from the worker */
+    private final Configuration conf;
+    /** Partition id */
+    private final int partitionId;
+    /** Vertex map for this range (keyed by index) */
+    private final Map<I, BasicVertex<I, V, E, M>> vertexMap =
+        new HashMap<I, BasicVertex<I, V, E, M>>();
+
+    public Partition(Configuration conf, int partitionId) {
+        this.conf = conf;
+        this.partitionId = partitionId;
+    }
+
+    /**
+     * Get the vertex for this vertex index.
+     *
+     * @param vertexIndex Vertex index to search for
+     * @return Vertex if it exists, null otherwise
+     */
+    public BasicVertex<I, V, E, M> getVertex(I vertexIndex) {
+        return vertexMap.get(vertexIndex);
+    }
+
+    /**
+     * Put a vertex into the Partition
+     *
+     * @param vertex Vertex to put in the Partition
+     * @return old vertex value (i.e. null if none existed prior)
+     */
+    public BasicVertex<I, V, E, M> putVertex(BasicVertex<I, V, E, M> vertex) {
+        return vertexMap.put(vertex.getVertexId(), vertex);
+    }
+
+    /**
+     * Remove a vertex from the Partition
+     *
+     * @param vertexIndex Vertex index to remove
+     */
+    public BasicVertex<I, V, E, M> removeVertex(I vertexIndex) {
+        return vertexMap.remove(vertexIndex);
+    }
+
+    /**
+     * Get a collection of the vertices.
+     *
+     * @return Collection of the vertices
+     */
+    public Collection<BasicVertex<I, V, E , M>> getVertices() {
+        return vertexMap.values();
+    }
+
+    /**
+     * Get the number of edges in this partition.  Computed on the fly.
+     *
+     * @return Number of edges.
+     */
+    public long getEdgeCount() {
+        long edges = 0;
+        for (BasicVertex<I, V, E, M> vertex : vertexMap.values()) {
+            edges += vertex.getNumOutEdges();
+        }
+        return edges;
+    }
+
+    /**
+     * Get the partition id.
+     *
+     * @return Partition id of this partition.
+     */
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    @Override
+    public String toString() {
+        return "(id=" + getPartitionId() + ",V=" + vertexMap.size() +
+            ",E=" + getEdgeCount() + ")";
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int vertices = input.readInt();
+        for (int i = 0; i < vertices; ++i) {
+            BasicVertex<I, V, E, M> vertex =
+                BspUtils.<I, V, E, M>createVertex(conf);
+            vertex.readFields(input);
+            if (vertexMap.put(vertex.getVertexId(),
+                              (Vertex<I, V, E, M>) vertex) != null) {
+                throw new IllegalStateException(
+                    "readFields: " + this +
+                    " already has same id " + vertex);
+            }
+        }
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.writeInt(vertexMap.size());
+        for (BasicVertex vertex : vertexMap.values()) {
+            vertex.write(output);
+        }
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+/**
+ * Helper class for balancing partitions across a set of workers.
+ */
+public class PartitionBalancer {
+    /** Partition balancing algorithm */
+    public static final String PARTITION_BALANCE_ALGORITHM =
+        "hash.partitionBalanceAlgorithm";
+    public static final String STATIC_BALANCE_ALGORITHM =
+        "static";
+    public static final String EGDE_BALANCE_ALGORITHM =
+        "edges";
+    public static final String VERTICES_BALANCE_ALGORITHM =
+        "vertices";
+    /** Class logger */
+    private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
+
+    /**
+     * What value to balance partitions with?  Edges, vertices?
+     */
+    private enum BalanceValue {
+        UNSET,
+        EDGES,
+        VERTICES
+    }
+
+    /**
+     * Get the value used to balance.
+     *
+     * @param partitionStat
+     * @param balanceValue
+     * @return
+     */
+    private static long getBalanceValue(PartitionStats partitionStat,
+                                        BalanceValue balanceValue) {
+        switch (balanceValue) {
+            case EDGES:
+                return partitionStat.getEdgeCount();
+            case VERTICES:
+                return partitionStat.getVertexCount();
+            default:
+                throw new IllegalArgumentException(
+                    "getBalanceValue: Illegal balance value " + balanceValue);
+        }
+    }
+
+    /**
+     * Used to sort the partition owners from lowest value to highest value
+     */
+    private static class PartitionOwnerComparator implements
+            Comparator<PartitionOwner> {
+        /** Map of owner to stats */
+        private final Map<PartitionOwner, PartitionStats> ownerStatMap;
+        /** Value type to compare on */
+        private final BalanceValue balanceValue;
+
+
+        /**
+         * Only constructor.
+         *
+         * @param comparatorValue What to compare with?
+         */
+        public PartitionOwnerComparator(
+                Map<PartitionOwner, PartitionStats> ownerStatMap,
+                BalanceValue balanceValue) {
+            this.ownerStatMap = ownerStatMap;
+            this.balanceValue = balanceValue;
+        }
+
+        @Override
+        public int compare(PartitionOwner owner1, PartitionOwner owner2) {
+            return (int)
+                (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
+                 getBalanceValue(ownerStatMap.get(owner2), balanceValue));
+        }
+    }
+
+    /**
+     * Structure to keep track of how much value a {@link WorkerInfo} has
+     * been assigned.
+     */
+    private static class WorkerInfoAssignments implements
+            Comparable<WorkerInfoAssignments> {
+        /** Worker info associated */
+        private final WorkerInfo workerInfo;
+        /** Balance value */
+        private final BalanceValue balanceValue;
+        /** Map of owner to stats */
+        private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
+        /** Current value of this object */
+        private long value = 0;
+
+        public WorkerInfoAssignments(
+                WorkerInfo workerInfo,
+                BalanceValue balanceValue,
+                Map<PartitionOwner, PartitionStats> ownerStatsMap) {
+            this.workerInfo = workerInfo;
+            this.balanceValue = balanceValue;
+            this.ownerStatsMap = ownerStatsMap;
+        }
+
+        /**
+         * Get the total value of all partitions assigned to this worker.
+         *
+         * @return Total value of all partition assignments.
+         */
+        public long getValue() {
+            return value;
+        }
+
+        /**
+         * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
+         *
+         * @param partitionOwner PartitionOwner to assign.
+         */
+        public void assignPartitionOwner(
+                PartitionOwner partitionOwner) {
+            value += getBalanceValue(ownerStatsMap.get(partitionOwner),
+                                     balanceValue);
+            if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
+                partitionOwner.setPreviousWorkerInfo(
+                    partitionOwner.getWorkerInfo());
+                partitionOwner.setWorkerInfo(workerInfo);
+            } else {
+                partitionOwner.setPreviousWorkerInfo(null);
+            }
+        }
+
+        @Override
+        public int compareTo(WorkerInfoAssignments other) {
+            return (int)
+                (getValue() - ((WorkerInfoAssignments) other).getValue());
+        }
+    }
+
+    /**
+     * Balance the partitions with an algorithm based on a value.
+     *
+     * @param conf Configuration to find the algorithm
+     * @param allPartitionStatsList All the partition stats
+     * @param availableWorkerInfos All the available workers
+     * @return Balanced partition owners
+     */
+    public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
+        Configuration conf,
+        Collection<PartitionOwner> partitionOwners,
+        Collection<PartitionStats> allPartitionStats,
+        Collection<WorkerInfo> availableWorkerInfos) {
+
+        String balanceAlgorithm =
+            conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
+        if (LOG.isInfoEnabled()) {
+            LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
+                     balanceAlgorithm);
+        }
+        BalanceValue balanceValue = BalanceValue.UNSET;
+        if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
+            return partitionOwners;
+        } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
+            balanceValue = BalanceValue.EDGES;
+        } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
+            balanceValue = BalanceValue.VERTICES;
+        } else {
+            throw new IllegalArgumentException(
+                "balancePartitionsAcrossWorkers: Illegal balance " +
+                "algorithm - " + balanceAlgorithm);
+        }
+
+        // Join the partition stats and partition owners by partition id
+        Map<Integer, PartitionStats> idStatMap =
+            new HashMap<Integer, PartitionStats>();
+        for (PartitionStats partitionStats : allPartitionStats) {
+            if (idStatMap.put(partitionStats.getPartitionId(), partitionStats)
+                    != null) {
+                throw new IllegalStateException(
+                    "balancePartitionsAcrossWorkers: Duplicate partition id " +
+                    "for " + partitionStats);
+            }
+        }
+        Map<PartitionOwner, PartitionStats> ownerStatsMap =
+            new HashMap<PartitionOwner, PartitionStats>();
+        for (PartitionOwner partitionOwner : partitionOwners) {
+            PartitionStats partitionStats =
+                idStatMap.get(partitionOwner.getPartitionId());
+            if (partitionStats == null) {
+                throw new IllegalStateException(
+                    "balancePartitionsAcrossWorkers: Missing partition " +
+                    "stats for " + partitionOwner);
+            }
+            if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
+                throw new IllegalStateException(
+                    "balancePartitionsAcrossWorkers: Duplicate partition " +
+                    "owner " + partitionOwner);
+            }
+        }
+        if (ownerStatsMap.size() != partitionOwners.size()) {
+            throw new IllegalStateException(
+                "balancePartitionsAcrossWorkers: ownerStats count = " +
+                ownerStatsMap.size() + ", partitionOwners count = " +
+                partitionOwners.size() + " and should match.");
+        }
+
+        List<WorkerInfoAssignments> workerInfoAssignmentsList =
+            new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
+        for (WorkerInfo workerInfo : availableWorkerInfos) {
+            workerInfoAssignmentsList.add(
+                new WorkerInfoAssignments(
+                    workerInfo, balanceValue, ownerStatsMap));
+        }
+
+        // A simple heuristic for balancing the partitions across the workers
+        // using a value (edges, vertices).  An improvement would be to
+        // take into account the already existing partition worker assignments.
+        // 1.  Sort the partitions by size
+        // 2.  Place the workers in a min heap sorted by their total balance
+        //     value.
+        // 3.  From largest partition to the smallest, take the partition
+        //     worker at the top of the heap, add the partition to it, and
+        //     then put it back in the heap
+        List<PartitionOwner> partitionOwnerList =
+            new ArrayList<PartitionOwner>(partitionOwners);
+        Collections.sort(partitionOwnerList,
+            Collections.reverseOrder(
+                new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
+        PriorityQueue<WorkerInfoAssignments> minQueue =
+            new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
+        for (PartitionOwner partitionOwner : partitionOwnerList) {
+            WorkerInfoAssignments chosenWorker = minQueue.remove();
+            chosenWorker.assignPartitionOwner(partitionOwner);
+            minQueue.add(chosenWorker);
+        }
+
+        return partitionOwnerList;
+    }
+}
+

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.giraph.graph.WorkerInfo;
+
+/**
+ * Describes what is required to send and wait for in a potential partition
+ * exchange between workers.
+ */
+public class PartitionExchange {
+    /** Workers that I am dependent on before I can continue */
+    private final Set<WorkerInfo> myDependencyWorkerSet;
+    /** Workers that I need to sent partitions to */
+    private final Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap;
+
+    /**
+     * Only constructor.
+     *
+     * @param myDependencyWorkerSet All the workers I must wait for
+     * @param sendWorkerPartitionMap Partitions I need to send to other workers
+     */
+    public PartitionExchange(
+            Set<WorkerInfo> myDependencyWorkerSet,
+            Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap) {
+        this.myDependencyWorkerSet = myDependencyWorkerSet;
+        this.sendWorkerPartitionMap = sendWorkerPartitionMap;
+    }
+
+    /**
+     * Get the workers that I must wait for
+     *
+     * @return Set of workers I must wait for
+     */
+    public Set<WorkerInfo> getMyDependencyWorkerSet() {
+        return myDependencyWorkerSet;
+    }
+
+    /**
+     * Get a mapping of worker to list of partition ids I need to send to.
+     *
+     * @return Mapping of worker to partition id list I will send to.
+     */
+    public Map<WorkerInfo, List<Integer>> getSendWorkerPartitionMap() {
+        return sendWorkerPartitionMap;
+    }
+
+    /**
+     * Is this worker involved in a partition exchange?  Receiving or sending?
+     *
+     * @return True if needs to be involved in the exchange, false otherwise.
+     */
+    public boolean doExchange() {
+        return !myDependencyWorkerSet.isEmpty() ||
+               !sendWorkerPartitionMap.isEmpty();
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Metadata about ownership of a partition.
+ */
+public interface PartitionOwner extends Writable {
+    /**
+     * Get the partition id that maps to the relevant {@link Partition} object
+     *
+     * @return Partition id
+     */
+    int getPartitionId();
+
+    /**
+     * Get the worker information that is currently responsible for
+     * the partition id.
+     *
+     * @return Owning worker information.
+     */
+    WorkerInfo getWorkerInfo();
+
+    /**
+     * Set the current worker info.
+     *
+     * @param workerInfo Worker info responsible for partition
+     */
+    void setWorkerInfo(WorkerInfo workerInfo);
+
+    /**
+     * Get the worker information that was previously responsible for the
+     * partition id.
+     *
+     * @return Owning worker information or null if no previous worker info.
+     */
+    WorkerInfo getPreviousWorkerInfo();
+
+    /**
+     * Set the previous worker info.
+     *
+     * @param workerInfo Worker info that was previously responsible for the
+     *        partition.
+     */
+    void setPreviousWorkerInfo(WorkerInfo workerInfo);
+
+    /**
+     * If this is a restarted checkpoint, the worker will use this information
+     * to determine where the checkpointed partition was stored on HDFS.
+     *
+     * @return Prefix of the checkpoint HDFS files for this partition, null if
+     *         this is not a restarted superstep.
+     */
+    String getCheckpointFilesPrefix();
+
+    /**
+     * Set the checkpoint files prefix.  Master uses this.
+     *
+     * @param checkpointFilesPrefix HDFS checkpoint file prefix
+     */
+    void setCheckpointFilesPrefix(String checkpointFilesPrefix);
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message