giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ikabi...@apache.org
Subject [2/2] git commit: updated refs/heads/trunk to ca36f1d
Date Wed, 27 Jan 2016 19:44:18 GMT
Use Partitions in LocalBlockRunner

Summary:
Speed up LocalBlockRunner, by not operating on a TestGraph, but on vertices stored in partitions.
With it - deprecate old non-SimplePartitionerFactory way of specifying partitioning.
(and with it renamed SimplePartitionerFactory to old name GraphPartitionerFactory, and changing it to
 GraphPartitionerFactoryInterface)

Test Plan:
Run unit-test for speed:

  testEmptyIterationsSmallGraph
    6.5 -> 6.3
  testEmptyIterationsSyntheticGraphLowDegree()
    42.0 -> 13.8
  testEmptyIterationsSyntheticGraphHighDegree()
    3.6 -> 2.0
  testPageRankSyntheticGraphLowDegree()
    51.0 -> 47.2
  testPageRankSyntheticGraphHighDegree()
    20.3 -> 17.4

Reviewers: maja.kabiljo, sergey.edunov, dionysis.logothetis

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D52425


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ca36f1d4
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ca36f1d4
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ca36f1d4

Branch: refs/heads/trunk
Commit: ca36f1d499a70b1aee85ac637de53f153a9f4c92
Parents: 1e802da
Author: Igor Kabiljo <ikabiljo@fb.com>
Authored: Tue Dec 29 15:14:32 2015 -0800
Committer: Igor Kabiljo <ikabiljo@fb.com>
Committed: Wed Jan 27 11:44:02 2016 -0800

----------------------------------------------------------------------
 .../api/local/TestLocalBlockRunnerSpeed.java    | 144 +++++++++++++++++++
 .../giraph/block_app/framework/BlockUtils.java  |   7 +
 .../framework/api/local/InternalApi.java        |  67 +++++++--
 .../framework/api/local/LocalBlockRunner.java   |  71 +++++----
 .../framework/api/local/VertexSaver.java        |  34 -----
 .../framework/internal/BlockMasterLogic.java    |  20 ++-
 .../internal/BlockWorkerContextLogic.java       |   6 +-
 .../test_setup/graphs/SyntheticGraphInit.java   |   7 -
 .../MultipleSimultanousMutationsTest.java       |  89 ++++++++++++
 .../org/apache/giraph/conf/GiraphConstants.java |   2 +-
 .../SuperstepHashPartitionerFactory.java        | 125 ----------------
 .../apache/giraph/integration/package-info.java |  21 ---
 .../partition/GraphPartitionerFactory.java      |  96 ++++++++++---
 .../GraphPartitionerFactoryInterface.java       |  59 ++++++++
 .../giraph/partition/HashMasterPartitioner.java | 117 ---------------
 .../partition/HashPartitionerFactory.java       |  23 +--
 .../partition/HashRangePartitionerFactory.java  |  25 ++--
 .../partition/HashRangeWorkerPartitioner.java   |  49 -------
 .../giraph/partition/HashWorkerPartitioner.java |  77 ----------
 .../LongMappingStorePartitionerFactory.java     |   6 +-
 .../partition/MasterGraphPartitionerImpl.java   | 111 ++++++++++++++
 .../SimpleIntRangePartitionerFactory.java       |   6 +-
 .../SimpleLongRangePartitionerFactory.java      |   6 +-
 .../partition/SimpleMasterPartitioner.java      | 112 ---------------
 .../partition/SimplePartitionerFactory.java     | 121 ----------------
 .../partition/SimpleWorkerPartitioner.java      | 109 --------------
 .../partition/WorkerGraphPartitionerImpl.java   | 109 ++++++++++++++
 .../apache/giraph/writable/kryo/HadoopKryo.java |   4 +
 .../org/apache/giraph/TestGraphPartitioner.java |  50 +------
 29 files changed, 753 insertions(+), 920 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java
new file mode 100644
index 0000000..44145e3
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.local;
+
+import org.apache.giraph.block_app.examples.pagerank.AbstractPageRankExampleBlockFactory;
+import org.apache.giraph.block_app.examples.pagerank.PageRankExampleBlockFactory;
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.block_app.test_setup.graphs.EachVertexInit;
+import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit;
+import org.apache.giraph.block_app.test_setup.graphs.SyntheticGraphInit;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestLocalBlockRunnerSpeed {
+  public static class EmptyPiecesBlockFactory extends AbstractPageRankExampleBlockFactory {
+    @Override
+    public Block createBlock(GiraphConfiguration conf) {
+      return new RepeatBlock(NUM_ITERATIONS.get(conf), new Piece<>());
+    }
+  }
+
+  @BeforeClass
+  public static void warmup() throws Exception {
+    TestGraphUtils.runTest(
+        new Small1GraphInit<LongWritable, DoubleWritable, NullWritable>(),
+        null,
+        (GiraphConfiguration conf) -> {
+          LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+          BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
+          BlockUtils.LOG_EXECUTION_STATUS.set(conf, false);
+          AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000);
+        });
+  }
+
+  @Test
+  @Ignore("use for benchmarking")
+  public void testEmptyIterationsSmallGraph() throws Exception {
+    TestGraphUtils.runTest(
+        new Small1GraphInit<LongWritable, DoubleWritable, NullWritable>(),
+        null,
+        (GiraphConfiguration conf) -> {
+          LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+          BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
+          BlockUtils.LOG_EXECUTION_STATUS.set(conf, false);
+          AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 10000);
+        });
+  }
+
+  @Test
+  @Ignore("use for benchmarking")
+  public void testEmptyIterationsSyntheticGraphLowDegree() throws Exception {
+    TestGraphUtils.runTest(
+        new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
+        null,
+        (GiraphConfiguration conf) -> {
+          LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+          BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
+          AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000);
+
+          SyntheticGraphInit.NUM_VERTICES.set(conf, 500000);
+          SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 10);
+          SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
+        });
+  }
+
+  @Test
+  @Ignore("use for benchmarking")
+  public void testEmptyIterationsSyntheticGraphHighDegree() throws Exception {
+    TestGraphUtils.runTest(
+        new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
+        null,
+        (GiraphConfiguration conf) -> {
+          LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+          BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
+          AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000);
+
+          SyntheticGraphInit.NUM_VERTICES.set(conf, 50000);
+          SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 100);
+          SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
+        });
+  }
+
+  @Test
+  @Ignore("use for benchmarking")
+  public void testPageRankSyntheticGraphLowDegree() throws Exception {
+    TestGraphUtils.runTest(
+        TestGraphUtils.chainModifiers(
+            new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
+            new EachVertexInit<>((vertex) -> vertex.getValue().set(1.0))),
+        null,
+        (GiraphConfiguration conf) -> {
+          LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+          BlockUtils.setBlockFactoryClass(conf, PageRankExampleBlockFactory.class);
+          AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 100);
+
+          SyntheticGraphInit.NUM_VERTICES.set(conf, 500000);
+          SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 10);
+          SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
+        });
+  }
+
+  @Test
+  @Ignore("use for benchmarking")
+  public void testPageRankSyntheticGraphHighDegree() throws Exception {
+    TestGraphUtils.runTest(
+        TestGraphUtils.chainModifiers(
+            new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
+            new EachVertexInit<>((vertex) -> vertex.getValue().set(1.0))),
+        null,
+        (GiraphConfiguration conf) -> {
+          LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+          BlockUtils.setBlockFactoryClass(conf, PageRankExampleBlockFactory.class);
+          AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 100);
+
+          SyntheticGraphInit.NUM_VERTICES.set(conf, 50000);
+          SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 100);
+          SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
index e00909c..6bf6d92 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
@@ -28,6 +28,7 @@ import org.apache.giraph.block_app.framework.api.giraph.BlockWorkerContext;
 import org.apache.giraph.block_app.framework.block.Block;
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.conf.BooleanConfOption;
 import org.apache.giraph.conf.ClassConfOption;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
@@ -58,6 +59,12 @@ public class BlockUtils {
           Object.class, Object.class,
           "block worker context value class");
 
+  /** Property describing whether to log execution status as application runs */
+  public static final
+  BooleanConfOption LOG_EXECUTION_STATUS = new BooleanConfOption(
+      "giraph.block_utils.log_execution_status", true,
+      "Log execution status (of which pieces are being executed, etc)");
+
   private static final Logger LOG = Logger.getLogger(BlockUtils.class);
 
   /** Dissallow constructor */

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
index 3ca8b1c..a4703b4 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
@@ -18,7 +18,6 @@
 package org.apache.giraph.block_app.framework.api.local;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -53,6 +52,8 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
+import org.apache.giraph.partition.GraphPartitionerFactory;
+import org.apache.giraph.partition.Partition;
 import org.apache.giraph.reducers.ReduceOperation;
 import org.apache.giraph.utils.TestGraph;
 import org.apache.giraph.utils.WritableUtils;
@@ -74,7 +75,10 @@ import com.google.common.base.Preconditions;
 @SuppressWarnings({ "rawtypes", "unchecked" })
 class InternalApi<I extends WritableComparable, V extends Writable,
     E extends Writable> implements BlockMasterApi, BlockOutputHandleAccessor {
-  private final TestGraph<I, V, E> graph;
+  private final TestGraph<I, V, E> inputGraph;
+  private final List<Partition<I, V, E>> partitions;
+  private final GraphPartitionerFactory<I, V, E> partitionerFactory;
+
   private final ImmutableClassesGiraphConfiguration conf;
   private final boolean runAllChecks;
   private final InternalAggregators globalComm;
@@ -94,8 +98,22 @@ class InternalApi<I extends WritableComparable, V extends Writable,
   public InternalApi(
       TestGraph<I, V, E> graph,
       ImmutableClassesGiraphConfiguration conf,
+      int numPartitions,
       boolean runAllChecks) {
-    this.graph = graph;
+    this.inputGraph = graph;
+    this.partitions = new ArrayList<>(numPartitions);
+    for (int i = 0; i < numPartitions; i++) {
+      this.partitions.add(conf.createPartition(i, null));
+    }
+    this.partitionerFactory = conf.createGraphPartitioner();
+    Preconditions.checkNotNull(this.partitionerFactory);
+    Preconditions.checkState(this.partitions.size() == numPartitions);
+
+    for (Vertex<I, V, E> vertex : graph) {
+      getPartition(vertex.getId()).putVertex(vertex);
+    }
+    graph.clear();
+
     this.conf = conf;
     this.runAllChecks = runAllChecks;
     this.globalComm = new InternalAggregators(runAllChecks);
@@ -362,8 +380,8 @@ class InternalApi<I extends WritableComparable, V extends Writable,
       Collections.EMPTY_SET : previousMessages.targetsSet();
     if (createVertexOnMsgs) {
       for (I target : targets) {
-        if (!graph.getVertices().containsKey(target)) {
-          mutations.put(target, new VertexMutations<I, V, E>());
+        if (getPartition(target).getVertex(target) == null) {
+          mutations.putIfAbsent(target, new VertexMutations<I, V, E>());
         }
       }
     }
@@ -371,23 +389,25 @@ class InternalApi<I extends WritableComparable, V extends Writable,
     VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
     for (Map.Entry<I, VertexMutations<I, V, E>> entry : mutations.entrySet()) {
       I vertexIndex = entry.getKey();
-      Vertex<I, V, E> originalVertex = graph.getVertex(vertexIndex);
+      Vertex<I, V, E> originalVertex =
+          getPartition(vertexIndex).getVertex(vertexIndex);
       VertexMutations<I, V, E> curMutations = entry.getValue();
       Vertex<I, V, E> vertex = vertexResolver.resolve(
           vertexIndex, originalVertex, curMutations,
           targets.contains(vertexIndex));
 
       if (vertex != null) {
-        graph.addVertex(vertex);
+        getPartition(vertex.getId()).putVertex(vertex);
       } else if (originalVertex != null) {
-        graph.getVertices().remove(originalVertex.getId());
+        getPartition(originalVertex.getId()).removeVertex(
+            originalVertex.getId());
       }
     }
     mutations.clear();
   }
 
-  public Collection<Vertex<I, V, E>> getAllVertices() {
-    return graph.getVertices().values();
+  public List<Partition<I, V, E>> getPartitions() {
+    return partitions;
   }
 
   public InternalWorkerApi getWorkerApi() {
@@ -397,15 +417,19 @@ class InternalApi<I extends WritableComparable, V extends Writable,
   @Override
   public long getTotalNumEdges() {
     int numEdges = 0;
-    for (Vertex<I, V, E> vertex : graph.getVertices().values()) {
-      numEdges += vertex.getNumEdges();
+    for (Partition<I, V, E> partition : partitions) {
+      numEdges += partition.getEdgeCount();
     }
     return numEdges;
   }
 
   @Override
   public long getTotalNumVertices() {
-    return graph.getVertices().size();
+    int numVertices = 0;
+    for (Partition<I, V, E> partition : partitions) {
+      numVertices += partition.getVertexCount();
+    }
+    return numVertices;
   }
 
   @Override
@@ -438,4 +462,21 @@ class InternalApi<I extends WritableComparable, V extends Writable,
   public int getWorkerCount() {
     return 1;
   }
+
+  private int getPartitionId(I id) {
+    Preconditions.checkNotNull(id);
+    return partitionerFactory.getPartition(id, partitions.size(), 1);
+  }
+
+  private Partition<I, V, E> getPartition(I id) {
+    return partitions.get(getPartitionId(id));
+  }
+
+  public void postApplication() {
+    for (Partition<I, V, E> partition : partitions) {
+      for (Vertex<I, V, E> vertex : partition) {
+        inputGraph.setVertex(vertex);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
index d582cb2..90aa8a2 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
@@ -17,10 +17,8 @@
  */
 package org.apache.giraph.block_app.framework.api.local;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import java.io.IOException;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -40,8 +38,11 @@ import org.apache.giraph.conf.BooleanConfOption;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.IntConfOption;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.SimpleVertexWriter;
+import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.giraph.utils.TestGraph;
+import org.apache.giraph.utils.Trimmable;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
 import org.apache.hadoop.io.Writable;
@@ -101,7 +102,7 @@ public class LocalBlockRunner {
   public static
   <I extends WritableComparable, V extends Writable, E extends Writable>
   void runApp(TestGraph<I, V, E> graph) {
-    VertexSaver<I, V, E> noOpVertexSaver = noOpVertexSaver();
+    SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
     runAppWithVertexOutput(graph, noOpVertexSaver);
   }
 
@@ -113,7 +114,7 @@ public class LocalBlockRunner {
   <I extends WritableComparable, V extends Writable, E extends Writable>
   void runBlock(
       TestGraph<I, V, E> graph, Block block, Object executionStage) {
-    VertexSaver<I, V, E> noOpVertexSaver = noOpVertexSaver();
+    SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
     runBlockWithVertexOutput(
         block, executionStage, graph, noOpVertexSaver);
   }
@@ -126,7 +127,7 @@ public class LocalBlockRunner {
   public static
   <I extends WritableComparable, V extends Writable, E extends Writable>
   void runAppWithVertexOutput(
-      TestGraph<I, V, E> graph, final VertexSaver<I, V, E> vertexSaver) {
+      TestGraph<I, V, E> graph, final SimpleVertexWriter<I, V, E> vertexSaver) {
     BlockFactory<?> factory = BlockUtils.createBlockFactory(graph.getConf());
     runBlockWithVertexOutput(
         factory.createBlock(graph.getConf()),
@@ -142,18 +143,18 @@ public class LocalBlockRunner {
   <I extends WritableComparable, V extends Writable, E extends Writable>
   void runBlockWithVertexOutput(
       Block block, Object executionStage, TestGraph<I, V, E> graph,
-      final VertexSaver<I, V, E> vertexSaver
+      final SimpleVertexWriter<I, V, E> vertexSaver
   ) {
     Preconditions.checkNotNull(block);
     Preconditions.checkNotNull(graph);
     ImmutableClassesGiraphConfiguration<I, V, E> conf = graph.getConf();
-    int numWorkers = NUM_THREADS.get(conf);
+    int numPartitions = NUM_THREADS.get(conf);
     boolean runAllChecks = RUN_ALL_CHECKS.get(conf);
     boolean serializeMaster = SERIALIZE_MASTER.get(conf);
     final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
 
     final InternalApi internalApi =
-        new InternalApi(graph, conf, runAllChecks);
+        new InternalApi(graph, conf, numPartitions, runAllChecks);
     final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi();
 
     BlockUtils.checkBlockTypes(block, executionStage, conf);
@@ -170,8 +171,7 @@ public class LocalBlockRunner {
           }
         }));
 
-    ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
-    Random rand = new Random();
+    ExecutorService executor = Executors.newFixedThreadPool(numPartitions);
 
     if (runAllChecks) {
       for (Vertex<I, V, E> vertex : graph) {
@@ -204,9 +204,15 @@ public class LocalBlockRunner {
           blockMasterLogic.computeNext(superstep);
       if (workerPieces == null) {
         if (!conf.doOutputDuringComputation()) {
-          Collection<Vertex<I, V, E>> vertices = internalApi.getAllVertices();
-          for (Vertex<I, V, E> vertex : vertices) {
-            vertexSaver.saveVertex(vertex);
+          List<Partition<I, V, E>> partitions = internalApi.getPartitions();
+          for (Partition<I, V, E> partition : partitions) {
+            for (Vertex<I, V, E> vertex : partition) {
+              try {
+                vertexSaver.writeVertex(vertex);
+              } catch (IOException | InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+            }
           }
         }
         int left = executor.shutdownNow().size();
@@ -214,14 +220,7 @@ public class LocalBlockRunner {
         break;
       } else {
         internalApi.afterMasterBeforeWorker(workerPieces);
-        List<List<Vertex<I, V, E>>> verticesPerWorker = new ArrayList<>();
-        for (int i = 0; i < numWorkers; i++) {
-          verticesPerWorker.add(new ArrayList<Vertex<I, V, E>>());
-        }
-        Collection<Vertex<I, V, E>> allVertices = internalApi.getAllVertices();
-        for (Vertex<I, V, E> vertex : allVertices) {
-          verticesPerWorker.get(rand.nextInt(numWorkers)).add(vertex);
-        }
+        List<Partition<I, V, E>> partitions = internalApi.getPartitions();
 
         workerContextLogic.preSuperstep(
             internalWorkerApi,
@@ -229,10 +228,10 @@ public class LocalBlockRunner {
             KryoWritableWrapper.wrapAndCopy(workerPieces), superstep,
             internalApi.takeWorkerMessages());
 
-        final CountDownLatch latch = new CountDownLatch(numWorkers);
+        final CountDownLatch latch = new CountDownLatch(numPartitions);
         final AtomicReference<Throwable> exception = new AtomicReference<>();
         anyVertexAlive.set(false);
-        for (final List<Vertex<I, V, E>> curVertices : verticesPerWorker) {
+        for (final Partition<I, V, E> partition : partitions) {
           executor.execute(new Runnable() {
             @Override
             public void run() {
@@ -244,16 +243,28 @@ public class LocalBlockRunner {
                 BlockWorkerLogic localLogic = new BlockWorkerLogic(localPieces);
                 localLogic.preSuperstep(internalWorkerApi, internalWorkerApi);
 
-                for (Vertex<I, V, E> vertex : curVertices) {
+                for (Vertex<I, V, E> vertex : partition) {
                   Iterable messages = internalApi.takeMessages(vertex.getId());
                   if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
                     vertex.wakeUp();
                   }
+                  // Equivalent of ComputeCallable.computePartition
                   if (!vertex.isHalted()) {
                     localLogic.compute(vertex, messages);
+
+                    // Need to unwrap the mutated edges (possibly)
+                    vertex.unwrapMutableEdges();
+                    //Compact edges representation if possible
+                    if (vertex instanceof Trimmable) {
+                      ((Trimmable) vertex).trim();
+                    }
+                    // Write vertex to superstep output
+                    // (no-op if it is not used)
                     if (doOutputDuringComputation) {
-                      vertexSaver.saveVertex(vertex);
+                      vertexSaver.writeVertex(vertex);
                     }
+                    // Need to save the vertex changes (possibly)
+                    partition.saveVertex(vertex);
                   }
 
                   if (!vertex.isHalted()) {
@@ -295,14 +306,16 @@ public class LocalBlockRunner {
     }
 
     workerContextLogic.postApplication();
+    internalApi.postApplication();
   }
 
   private static
   <I extends WritableComparable, E extends Writable, V extends Writable>
-  VertexSaver<I, V, E> noOpVertexSaver() {
-    return new VertexSaver<I, V, E>() {
+  SimpleVertexWriter<I, V, E> noOpVertexSaver() {
+    return new SimpleVertexWriter<I, V, E>() {
       @Override
-      public void saveVertex(Vertex<I, V, E> vertex) {
+      public void writeVertex(Vertex<I, V, E> vertex)
+          throws IOException, InterruptedException {
         // No-op
       }
     };

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java
deleted file mode 100644
index 0053644..0000000
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.block_app.framework.api.local;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Interface to use for saving vertices
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-public interface VertexSaver<I extends WritableComparable, V extends Writable,
-    E extends Writable> {
-  void saveVertex(Vertex<I, V, E> vertex);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
index 4892a33..a52bb77 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
@@ -126,8 +126,12 @@ public class BlockMasterLogic<S> {
       postApplication();
       return null;
     } else {
-      LOG.info(
-          "Master executing " + previousPiece + ", in superstep " + superstep);
+      boolean logExecutionStatus =
+          BlockUtils.LOG_EXECUTION_STATUS.get(masterApi.getConf());
+      if (logExecutionStatus) {
+        LOG.info("Master executing " + previousPiece +
+            ", in superstep " + superstep);
+      }
       previousPiece.masterCompute(masterApi);
       ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
           returnAllWriters();
@@ -149,8 +153,10 @@ public class BlockMasterLogic<S> {
       BlockCounters.setStageCounters(
           "Master finished stage: ", previousPiece.getExecutionStage(),
           masterApi);
-      LOG.info(
-          "Master passing next " + nextPiece + ", in superstep " + superstep);
+      if (logExecutionStatus) {
+        LOG.info(
+            "Master passing next " + nextPiece + ", in superstep " + superstep);
+      }
 
       // if there is nothing more to compute, no need for additional superstep
       // this can only happen if application uses no pieces.
@@ -160,8 +166,10 @@ public class BlockMasterLogic<S> {
         result = null;
       } else {
         result = new BlockWorkerPieces<>(previousPiece, nextPiece);
-        LOG.info("Master in " + superstep + " superstep passing " +
-            result + " to be executed");
+        if (logExecutionStatus) {
+          LOG.info("Master in " + superstep + " superstep passing " +
+              result + " to be executed");
+        }
       }
 
       previousPiece = nextPiece;

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
index 8b8e174..ca2bb5a 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
@@ -65,8 +65,10 @@ public class BlockWorkerContextLogic {
       BlockWorkerContextSendApi sendApi,
       BlockWorkerPieces workerPieces, long superstep,
       List<Writable> messages) {
-    LOG.info("Worker executing " + workerPieces + " in " + superstep +
-        " superstep");
+    if (BlockUtils.LOG_EXECUTION_STATUS.get(receiveApi.getConf())) {
+      LOG.info("Worker executing " + workerPieces + " in " + superstep +
+          " superstep");
+    }
     this.sendApi = sendApi;
     this.workerPieces = workerPieces;
     if (workerPieces.getReceiver() != null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java
index 3de158a..cd485b4 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java
@@ -58,7 +58,6 @@ public class SyntheticGraphInit<I extends WritableComparable,
     this.edgeSupplier = null;
   }
 
-
   @Override
   public void modifyGraph(NumericTestGraph<I, V, E> graph) {
     GiraphConfiguration conf = graph.getConf();
@@ -84,11 +83,5 @@ public class SyntheticGraphInit<I extends WritableComparable,
             i, j, edgeSupplier != null ? edgeSupplier.get() : null);
       }
     }
-
-//    if (vertexModifier != null) {
-//      for (int i = 0; i < numVertices; i++) {
-//        vertexModifier.modifyVertexValue(i, graph.getVertex(i).getValue());
-//      }
-//    }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java
new file mode 100644
index 0000000..e2c316e
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java
@@ -0,0 +1,89 @@
+package org.apache.giraph.block_app.framework;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphChecker;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.ReusableEdge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test when vertex gets multiple simultaneous mutations
+ * (i.e. to non-existent vertex, send a message and do add edge request)
+ * and confirm all mutations are correctly processed
+ */
+public class MultipleSimultanousMutationsTest {
+  @Test
+  public void createVertexOnMsgsTest() throws Exception {
+    TestGraphUtils.runTest(
+        new TestGraphModifier<LongWritable, Writable, LongWritable>() {
+          @Override
+          public void modifyGraph(NumericTestGraph<LongWritable, Writable, LongWritable> graph) {
+            graph.addEdge(1, 2, 2);
+          }
+        },
+        new TestGraphChecker<LongWritable, Writable, LongWritable>() {
+          @Override
+          public void checkOutput(NumericTestGraph<LongWritable, Writable, LongWritable> graph) {
+            Assert.assertEquals(1, graph.getVertex(1).getNumEdges());
+            Assert.assertNull(graph.getVertex(1).getEdgeValue(new LongWritable(-1)));
+            Assert.assertEquals(2, graph.getVertex(1).getEdgeValue(new LongWritable(2)).get());
+
+            Assert.assertEquals(1, graph.getVertex(2).getNumEdges());
+            Assert.assertEquals(-1, graph.getVertex(2).getEdgeValue(new LongWritable(-1)).get());
+          }
+        },
+        new BulkConfigurator() {
+          @Override
+          public void configure(GiraphConfiguration conf) {
+            BlockUtils.setBlockFactoryClass(conf, SendingAndAddEdgeBlockFactory.class);
+          }
+        });
+  }
+
+  public static class SendingAndAddEdgeBlockFactory extends TestLongNullNullBlockFactory {
+    @Override
+    protected Class<? extends Writable> getEdgeValueClass(GiraphConfiguration conf) {
+      return LongWritable.class;
+    }
+
+    @Override
+    public Block createBlock(GiraphConfiguration conf) {
+      return new Piece<LongWritable, Writable, LongWritable, NullWritable, Object>() {
+        @Override
+        protected Class<NullWritable> getMessageClass() {
+          return NullWritable.class;
+        }
+
+        @Override
+        public VertexSender<LongWritable, Writable, LongWritable> getVertexSender(
+            final BlockWorkerSendApi<LongWritable, Writable, LongWritable, NullWritable> workerApi,
+            Object executionStage) {
+          final ReusableEdge<LongWritable, LongWritable> reusableEdge = workerApi.getConf().createReusableEdge();
+          reusableEdge.setTargetVertexId(new LongWritable(-1));
+          reusableEdge.setValue(new LongWritable(-1));
+          return new VertexSender<LongWritable, Writable, LongWritable>() {
+            @Override
+            public void vertexSend(Vertex<LongWritable, Writable, LongWritable> vertex) {
+              for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
+                workerApi.addEdgeRequest(edge.getTargetVertexId(), reusableEdge);
+                workerApi.sendMessage(edge.getTargetVertexId(), NullWritable.get());
+              }
+            }
+          };
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index e74703e..8ad3767 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -873,7 +873,7 @@ public interface GiraphConstants {
           "Overrides default partition count calculation if not -1");
 
   /** Vertex key space size for
-   * {@link org.apache.giraph.partition.SimpleWorkerPartitioner}
+   * {@link org.apache.giraph.partition.WorkerGraphPartitionerImpl}
    */
   String PARTITION_VERTEX_KEY_SPACE_SIZE = "giraph.vertexKeySpaceSize";
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
deleted file mode 100644
index ebc62f6..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.integration;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.giraph.partition.BasicPartitionOwner;
-import org.apache.giraph.partition.HashMasterPartitioner;
-import org.apache.giraph.partition.HashPartitionerFactory;
-import org.apache.giraph.partition.MasterGraphPartitioner;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStats;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * Example graph partitioner that builds on {@link HashMasterPartitioner} to
- * send the partitions to the worker that matches the superstep.  It is for
- * testing only and should never be used in practice.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class SuperstepHashPartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends HashPartitionerFactory<I, V, E> {
-  /**
-   * Changes the {@link HashMasterPartitioner} to make ownership of the
-   * partitions based on a superstep.  For testing only as it is totally
-   * unbalanced.
-   *
-   * @param <I> vertex id
-   * @param <V> vertex data
-   * @param <E> edge data
-   */
-  private static class SuperstepMasterPartition<I extends WritableComparable,
-      V extends Writable, E extends Writable>
-      extends HashMasterPartitioner<I, V, E> {
-    /** Class logger */
-    private static Logger LOG =
-        Logger.getLogger(SuperstepMasterPartition.class);
-
-    /**
-     * Construction with configuration.
-     *
-     * @param conf Configuration to be stored.
-     */
-    public SuperstepMasterPartition(ImmutableClassesGiraphConfiguration conf) {
-      super(conf);
-    }
-
-    @Override
-    public Collection<PartitionOwner> generateChangedPartitionOwners(
-        Collection<PartitionStats> allPartitionStatsList,
-        Collection<WorkerInfo> availableWorkerInfos,
-        int maxWorkers,
-        long superstep) {
-      // Assign all the partitions to
-      // superstep mod availableWorkerInfos
-      // Guaranteed to be different if the workers (and their order)
-      // do not change
-      long workerIndex = superstep % availableWorkerInfos.size();
-      int i = 0;
-      WorkerInfo chosenWorkerInfo = null;
-      for (WorkerInfo workerInfo : availableWorkerInfos) {
-        if (workerIndex == i) {
-          chosenWorkerInfo = workerInfo;
-        }
-        ++i;
-      }
-      if (LOG.isInfoEnabled()) {
-        LOG.info("generateChangedPartitionOwners: Chosen worker " +
-                 "for superstep " + superstep + " is " +
-                 chosenWorkerInfo);
-      }
-
-      List<PartitionOwner> partitionOwnerList = new ArrayList<PartitionOwner>();
-      for (PartitionOwner partitionOwner :
-        getCurrentPartitionOwners()) {
-        WorkerInfo prevWorkerinfo =
-          partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ?
-            null : partitionOwner.getWorkerInfo();
-        PartitionOwner tmpPartitionOwner =
-          new BasicPartitionOwner(partitionOwner.getPartitionId(),
-                                  chosenWorkerInfo,
-                                  prevWorkerinfo,
-                                  null);
-        partitionOwnerList.add(tmpPartitionOwner);
-        LOG.info("partition owner was " + partitionOwner +
-            ", new " + tmpPartitionOwner);
-      }
-      setPartitionOwnerList(partitionOwnerList);
-      return partitionOwnerList;
-    }
-  }
-
-  @Override
-  public MasterGraphPartitioner<I, V, E>
-  createMasterGraphPartitioner() {
-    return new SuperstepMasterPartition<I, V, E>(getConf());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java b/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java
deleted file mode 100644
index 4c6ae30..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of all helper integration test objects.
- */
-package org.apache.giraph.integration;

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
index c5e2f3e..5726d25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -26,34 +26,98 @@ import org.apache.hadoop.io.WritableComparable;
 /**
  * Defines the partitioning framework for this application.
  *
- * @param <I> Vertex index value
+ * Abstracts and implements all GraphPartitionerFactoryInterface logic
+ * on top of two functions which define partitioning scheme:
+ * - which partition vertex should be in, and
+ * - which partition should belong to which worker
+ *
+ * @param <I> Vertex id value
  * @param <V> Vertex value
  * @param <E> Edge value
  */
 @SuppressWarnings("rawtypes")
-public interface GraphPartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable> extends
-    ImmutableClassesGiraphConfigurable<I, V, E> {
+public abstract class GraphPartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+    implements GraphPartitionerFactoryInterface<I, V, E>  {
+  @Override
+  public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+  }
+
+  @Override
+  public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
+    return new MasterGraphPartitionerImpl<I, V, E>(getConf()) {
+      @Override
+      protected int getWorkerIndex(int partition, int partitionCount,
+          int workerCount) {
+        return GraphPartitionerFactory.this.getWorker(
+            partition, partitionCount, workerCount);
+      }
+    };
+  }
+
+  @Override
+  public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
+    return new WorkerGraphPartitionerImpl<I, V, E>() {
+      @Override
+      protected int getPartitionIndex(I id, int partitionCount,
+        int workerCount) {
+        return GraphPartitionerFactory.this.getPartition(id,
+            partitionCount, workerCount);
+      }
+    };
+  }
+
+  /**
+   * Calculates in which partition current vertex belongs to,
+   * from interval [0, partitionCount).
+   *
+   * @param id Vertex id
+   * @param partitionCount Number of partitions
+   * @param workerCount Number of workers
+   * @return partition
+   */
+  public abstract int getPartition(I id, int partitionCount,
+    int workerCount);
 
   /**
-   * Use some local data present in the worker
+   * Calculates worker that should be responsible for passed partition.
    *
-   * @param localData localData present in the worker
+   * @param partition Current partition
+   * @param partitionCount Number of partitions
+   * @param workerCount Number of workers
+   * @return index of worker responsible for current partition
    */
-  void initialize(LocalData<I, V, E, ? extends Writable> localData);
+  public abstract int getWorker(
+      int partition, int partitionCount, int workerCount);
+
   /**
-   * Create the {@link MasterGraphPartitioner} used by the master.
-   * Instantiated once by the master and reused.
+   * Utility function for calculating in which partition value
+   * from interval [0, max) should belong to.
    *
-   * @return Instantiated master graph partitioner
+   * @param value Value for which partition is requested
+   * @param max Maximum possible value
+   * @param partitions Number of partitions, equally sized.
+   * @return Index of partition where value belongs to.
    */
-  MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner();
+  public static int getPartitionInRange(int value, int max, int partitions) {
+    double keyRange = ((double) max) / partitions;
+    int part = (int) ((value % max) / keyRange);
+    return Math.max(0, Math.min(partitions - 1, part));
+  }
 
   /**
-   * Create the {@link WorkerGraphPartitioner} used by the worker.
-   * Instantiated once by every worker and reused.
+   * Utility function for calculating in which partition value
+   * from interval [0, max) should belong to.
    *
-   * @return Instantiated worker graph partitioner
+   * @param value Value for which partition is requested
+   * @param max Maximum possible value
+   * @param partitions Number of partitions, equally sized.
+   * @return Index of partition where value belongs to.
    */
-  WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner();
+  public static int getPartitionInRange(long value, long max, int partitions) {
+    double keyRange = ((double) max) / partitions;
+    int part = (int) ((value % max) / keyRange);
+    return Math.max(0, Math.min(partitions - 1, part));
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java
new file mode 100644
index 0000000..5551100
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Defines the partitioning framework for this application.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public interface GraphPartitionerFactoryInterface<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends
+    ImmutableClassesGiraphConfigurable<I, V, E> {
+
+  /**
+   * Use some local data present in the worker
+   *
+   * @param localData localData present in the worker
+   */
+  void initialize(LocalData<I, V, E, ? extends Writable> localData);
+  /**
+   * Create the {@link MasterGraphPartitioner} used by the master.
+   * Instantiated once by the master and reused.
+   *
+   * @return Instantiated master graph partitioner
+   */
+  MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner();
+
+  /**
+   * Create the {@link WorkerGraphPartitioner} used by the worker.
+   * Instantiated once by every worker and reused.
+   *
+   * @return Instantiated worker graph partitioner
+   */
+  WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
deleted file mode 100644
index 607347d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Master will execute a hash based partitioning.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class HashMasterPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable> implements
-    MasterGraphPartitioner<I, V, E> {
-  /** Class logger */
-  private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
-  /** Provided configuration */
-  private ImmutableClassesGiraphConfiguration conf;
-  /** Save the last generated partition owner list */
-  private List<PartitionOwner> partitionOwnerList;
-
-  /**
-   * Constructor.
-   *
-   *@param conf Configuration used.
-   */
-  public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Collection<PartitionOwner> createInitialPartitionOwners(
-      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
-    int partitionCount = PartitionUtils.computePartitionCount(
-        availableWorkerInfos.size(), conf);
-    List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
-    Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
-    for (int i = 0; i < partitionCount; ++i) {
-      PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
-      if (!workerIt.hasNext()) {
-        workerIt = availableWorkerInfos.iterator();
-      }
-      ownerList.add(owner);
-    }
-    this.partitionOwnerList = ownerList;
-    return ownerList;
-  }
-
-  @Override
-  public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
-    this.partitionOwnerList = Lists.newArrayList(partitionOwners);
-  }
-
-  @Override
-  public Collection<PartitionOwner> getCurrentPartitionOwners() {
-    return partitionOwnerList;
-  }
-
-  /**
-   * Subclasses can set the partition owner list.
-   *
-   * @param partitionOwnerList New partition owner list.
-   */
-  protected void setPartitionOwnerList(List<PartitionOwner>
-  partitionOwnerList) {
-    this.partitionOwnerList = partitionOwnerList;
-  }
-
-  @Override
-  public Collection<PartitionOwner> generateChangedPartitionOwners(
-      Collection<PartitionStats> allPartitionStatsList,
-      Collection<WorkerInfo> availableWorkerInfos,
-      int maxWorkers,
-      long superstep) {
-    return PartitionBalancer.balancePartitionsAcrossWorkers(
-        conf,
-        partitionOwnerList,
-        allPartitionStatsList,
-        availableWorkerInfos);
-  }
-
-  @Override
-  public PartitionStats createPartitionStats() {
-    return new PartitionStats();
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
index 221e50d..17aec51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
@@ -15,11 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.partition;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -27,27 +24,21 @@ import org.apache.hadoop.io.WritableComparable;
  * Divides the vertices into partitions by their hash code using a simple
  * round-robin hash for great balancing if given a random hash code.
  *
- * @param <I> Vertex index value
+ * @param <I> Vertex id value
  * @param <V> Vertex value
  * @param <E> Edge value
  */
-@SuppressWarnings("rawtypes")
 public class HashPartitionerFactory<I extends WritableComparable,
-  V extends Writable, E extends Writable>
-  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
-  implements GraphPartitionerFactory<I, V, E>  {
-
-  @Override
-  public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
-  }
+    V extends Writable, E extends Writable>
+    extends GraphPartitionerFactory<I, V, E> {
 
   @Override
-  public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
-    return new HashMasterPartitioner<I, V, E>(getConf());
+  public int getPartition(I id, int partitionCount, int workerCount) {
+    return Math.abs(id.hashCode() % partitionCount);
   }
 
   @Override
-  public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
-    return new HashWorkerPartitioner<I, V, E>();
+  public int getWorker(int partition, int partitionCount, int workerCount) {
+    return partition % workerCount;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
index 5f7ee40..ef65800 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
@@ -18,11 +18,11 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import com.google.common.primitives.UnsignedInts;
+
 /**
  * Divides the vertices into partitions by their hash code using ranges of the
  * hash space.
@@ -33,21 +33,22 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public class HashRangePartitionerFactory<I extends WritableComparable,
-  V extends Writable, E extends Writable>
-  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
-  implements GraphPartitionerFactory<I, V, E> {
+    V extends Writable, E extends Writable>
+    extends GraphPartitionerFactory<I, V, E> {
 
-  @Override
-  public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
-  }
+  /** A transformed hashCode() must be strictly smaller than this. */
+  private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L;
 
   @Override
-  public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
-    return new HashMasterPartitioner<I, V, E>(getConf());
+  public int getPartition(I id, int partitionCount, int workerCount) {
+    long unsignedHashCode = UnsignedInts.toLong(id.hashCode());
+    // The reader can verify that unsignedHashCode of HASH_LIMIT - 1 yields
+    // index of size - 1, and unsignedHashCode of 0 yields index of 0.
+    return (int) ((unsignedHashCode * partitionCount) / HASH_LIMIT);
   }
 
   @Override
-  public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
-    return new HashRangeWorkerPartitioner<I, V, E>();
+  public int getWorker(int partition, int partitionCount, int workerCount) {
+    return partition % workerCount;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
deleted file mode 100644
index 81c3d7d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.primitives.UnsignedInts;
-
-/**
- * Implements range-based partitioning from the id hash code.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class HashRangeWorkerPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends HashWorkerPartitioner<I, V, E> {
-  /** A transformed hashCode() must be strictly smaller than this. */
-  private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L;
-
-  @Override
-  public PartitionOwner getPartitionOwner(I vertexId) {
-    long unsignedHashCode = UnsignedInts.toLong(vertexId.hashCode());
-    // The reader can verify that unsignedHashCode of HASH_LIMIT - 1 yields
-    // index of size - 1, and unsignedHashCode of 0 yields index of 0.
-    int index = (int)
-        ((unsignedHashCode * getPartitionOwners().size()) / HASH_LIMIT);
-    return partitionOwnerList.get(index);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
deleted file mode 100644
index 12aa417..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Implements hash-based partitioning from the id hash code.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class HashWorkerPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements WorkerGraphPartitioner<I, V, E> {
-  /**
-   * Mapping of the vertex ids to {@link PartitionOwner}.
-   */
-  protected List<PartitionOwner> partitionOwnerList =
-      Lists.newArrayList();
-
-  @Override
-  public PartitionOwner createPartitionOwner() {
-    return new BasicPartitionOwner();
-  }
-
-  @Override
-  public PartitionOwner getPartitionOwner(I vertexId) {
-    return partitionOwnerList.get(
-        Math.abs(vertexId.hashCode() % partitionOwnerList.size()));
-  }
-
-  @Override
-  public Collection<PartitionStats> finalizePartitionStats(
-      Collection<PartitionStats> workerPartitionStats,
-      PartitionStore<I, V, E> partitionStore) {
-    // No modification necessary
-    return workerPartitionStats;
-  }
-
-  @Override
-  public PartitionExchange updatePartitionOwners(
-      WorkerInfo myWorkerInfo,
-      Collection<? extends PartitionOwner> masterSetPartitionOwners) {
-    return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
-        myWorkerInfo, masterSetPartitionOwners);
-  }
-
-  @Override
-  public Collection<? extends PartitionOwner> getPartitionOwners() {
-    return partitionOwnerList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
index e129050..98d1285 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
@@ -31,7 +31,7 @@ import org.apache.log4j.Logger;
  */
 @SuppressWarnings("unchecked")
 public class LongMappingStorePartitionerFactory<V extends Writable,
-    E extends Writable> extends SimplePartitionerFactory<LongWritable, V, E> {
+    E extends Writable> extends GraphPartitionerFactory<LongWritable, V, E> {
   /** Logger Instance */
   private static final Logger LOG = Logger.getLogger(
       LongMappingStorePartitionerFactory.class);
@@ -46,14 +46,14 @@ public class LongMappingStorePartitionerFactory<V extends Writable,
   }
 
   @Override
-  protected int getPartition(LongWritable id, int partitionCount,
+  public int getPartition(LongWritable id, int partitionCount,
     int workerCount) {
     return localData.getMappingStoreOps().getPartition(id,
         partitionCount, workerCount);
   }
 
   @Override
-  protected int getWorker(int partition, int partitionCount, int workerCount) {
+  public int getWorker(int partition, int partitionCount, int workerCount) {
     int numRows = partitionCount / workerCount;
     numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
     return partition / numRows;

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitionerImpl.java b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitionerImpl.java
new file mode 100644
index 0000000..b9916dc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitionerImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Abstracts and implements all MasterGraphPartitioner logic on top of a single
+ * user function - getWorkerIndex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public abstract class MasterGraphPartitionerImpl<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements MasterGraphPartitioner<I, V, E> {
+  /** Provided configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  /** Save the last generated partition owner list */
+  private List<PartitionOwner> partitionOwnerList;
+
+  /**
+   * Constructor.
+   *
+   * @param conf
+   *          Configuration used.
+   */
+  public MasterGraphPartitionerImpl(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Collection<PartitionOwner> createInitialPartitionOwners(
+      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
+    int partitionCount = PartitionUtils.computePartitionCount(
+        availableWorkerInfos.size(), conf);
+    ArrayList<WorkerInfo> workerList =
+        new ArrayList<WorkerInfo>(availableWorkerInfos);
+
+    partitionOwnerList = new ArrayList<PartitionOwner>();
+    for (int i = 0; i < partitionCount; i++) {
+      partitionOwnerList.add(new BasicPartitionOwner(i, workerList.get(
+          getWorkerIndex(i, partitionCount, workerList.size()))));
+    }
+
+    return partitionOwnerList;
+  }
+
+  @Override
+  public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
+    partitionOwnerList = Lists.newArrayList(partitionOwners);
+  }
+
+  @Override
+  public Collection<PartitionOwner> generateChangedPartitionOwners(
+      Collection<PartitionStats> allPartitionStatsList,
+      Collection<WorkerInfo> availableWorkers,
+      int maxWorkers,
+      long superstep) {
+    return PartitionBalancer.balancePartitionsAcrossWorkers(conf,
+        partitionOwnerList, allPartitionStatsList, availableWorkers);
+  }
+
+  @Override
+  public Collection<PartitionOwner> getCurrentPartitionOwners() {
+    return partitionOwnerList;
+  }
+
+  @Override
+  public PartitionStats createPartitionStats() {
+    return new PartitionStats();
+  }
+
+  /**
+   * Calculates worker that should be responsible for passed partition.
+   *
+   * @param partition Current partition
+   * @param partitionCount Number of partitions
+   * @param workerCount Number of workers
+   * @return index of worker responsible for current partition
+   */
+  protected abstract int getWorkerIndex(
+      int partition, int partitionCount, int workerCount);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
index 5dd580b..6d1dcb1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
@@ -32,13 +32,13 @@ import org.apache.hadoop.io.Writable;
  * @param <E> Edge value type
  */
 public class SimpleIntRangePartitionerFactory<V extends Writable,
-  E extends Writable> extends SimplePartitionerFactory<IntWritable, V, E> {
+  E extends Writable> extends GraphPartitionerFactory<IntWritable, V, E> {
 
   /** Vertex key space size. */
   private int keySpaceSize;
 
   @Override
-  protected int getPartition(IntWritable id, int partitionCount,
+  public int getPartition(IntWritable id, int partitionCount,
     int workerCount) {
     return getPartition(id, partitionCount);
   }
@@ -56,7 +56,7 @@ public class SimpleIntRangePartitionerFactory<V extends Writable,
   }
 
   @Override
-  protected int getWorker(int partition, int partitionCount, int workerCount) {
+  public int getWorker(int partition, int partitionCount, int workerCount) {
     return getPartitionInRange(partition, partitionCount, workerCount);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
index e637e16..9dee3d1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
@@ -32,13 +32,13 @@ import org.apache.hadoop.io.Writable;
  * @param <E> Edge value type
  */
 public class SimpleLongRangePartitionerFactory<V extends Writable,
-  E extends Writable> extends SimplePartitionerFactory<LongWritable, V, E> {
+  E extends Writable> extends GraphPartitionerFactory<LongWritable, V, E> {
 
   /** Vertex key space size. */
   private long keySpaceSize;
 
   @Override
-  protected int getPartition(LongWritable id, int partitionCount,
+  public int getPartition(LongWritable id, int partitionCount,
     int workerCount) {
     return getPartition(id, partitionCount);
   }
@@ -56,7 +56,7 @@ public class SimpleLongRangePartitionerFactory<V extends Writable,
   }
 
   @Override
-  protected int getWorker(int partition, int partitionCount, int workerCount) {
+  public int getWorker(int partition, int partitionCount, int workerCount) {
     return getPartitionInRange(partition, partitionCount, workerCount);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
deleted file mode 100644
index 638dacf..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * Abstracts and implements all MasterGraphPartitioner logic on top of a single
- * user function - getWorkerIndex.
- *
- * @param <I> Vertex id type
- * @param <V> Vertex value type
- * @param <E> Edge value type
- */
-public abstract class SimpleMasterPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements MasterGraphPartitioner<I, V, E> {
-  /** Class logger */
-  private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
-  /** Provided configuration */
-  private ImmutableClassesGiraphConfiguration conf;
-  /** Save the last generated partition owner list */
-  private List<PartitionOwner> partitionOwnerList;
-
-  /**
-   * Constructor.
-   *
-   * @param conf
-   *          Configuration used.
-   */
-  public SimpleMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Collection<PartitionOwner> createInitialPartitionOwners(
-      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
-    int partitionCount = PartitionUtils.computePartitionCount(
-        availableWorkerInfos.size(), conf);
-    ArrayList<WorkerInfo> workerList =
-        new ArrayList<WorkerInfo>(availableWorkerInfos);
-
-    partitionOwnerList = new ArrayList<PartitionOwner>();
-    for (int i = 0; i < partitionCount; i++) {
-      partitionOwnerList.add(new BasicPartitionOwner(i, workerList.get(
-          getWorkerIndex(i, partitionCount, workerList.size()))));
-    }
-
-    return partitionOwnerList;
-  }
-
-  @Override
-  public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
-    partitionOwnerList = Lists.newArrayList(partitionOwners);
-  }
-
-  @Override
-  public Collection<PartitionOwner> generateChangedPartitionOwners(
-      Collection<PartitionStats> allPartitionStatsList,
-      Collection<WorkerInfo> availableWorkers,
-      int maxWorkers,
-      long superstep) {
-    return PartitionBalancer.balancePartitionsAcrossWorkers(conf,
-        partitionOwnerList, allPartitionStatsList, availableWorkers);
-  }
-
-  @Override
-  public Collection<PartitionOwner> getCurrentPartitionOwners() {
-    return partitionOwnerList;
-  }
-
-  @Override
-  public PartitionStats createPartitionStats() {
-    return new PartitionStats();
-  }
-
-  /**
-   * Calculates worker that should be responsible for passed partition.
-   *
-   * @param partition Current partition
-   * @param partitionCount Number of partitions
-   * @param workerCount Number of workers
-   * @return index of worker responsible for current partition
-   */
-  protected abstract int getWorkerIndex(
-      int partition, int partitionCount, int workerCount);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
deleted file mode 100644
index 1e29846..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.worker.LocalData;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Abstracts and implements all GraphPartitionerFactory logic on top of two
- * functions which define partitioning scheme:
- * - which partition user should be in, and
- * - which partition should belong to which worker
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-public abstract class SimplePartitionerFactory<I extends WritableComparable,
-  V extends Writable, E extends Writable>
-  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
-  implements GraphPartitionerFactory<I, V, E> {
-
-  @Override
-  public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
-  }
-
-  @Override
-  public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
-    return new SimpleMasterPartitioner<I, V, E>(getConf()) {
-      @Override
-      protected int getWorkerIndex(int partition, int partitionCount,
-          int workerCount) {
-        return SimplePartitionerFactory.this.getWorker(
-            partition, partitionCount, workerCount);
-      }
-    };
-  }
-
-  @Override
-  public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
-    return new SimpleWorkerPartitioner<I, V, E>() {
-      @Override
-      protected int getPartitionIndex(I id, int partitionCount,
-        int workerCount) {
-        return SimplePartitionerFactory.this.getPartition(id,
-            partitionCount, workerCount);
-      }
-    };
-  }
-
-  /**
-   * Calculates in which partition current vertex belongs to,
-   * from interval [0, partitionCount).
-   *
-   * @param id Vertex id
-   * @param partitionCount Number of partitions
-   * @param workerCount Number of workers
-   * @return partition
-   */
-  protected abstract int getPartition(I id, int partitionCount,
-    int workerCount);
-
-  /**
-   * Calculates worker that should be responsible for passed partition.
-   *
-   * @param partition Current partition
-   * @param partitionCount Number of partitions
-   * @param workerCount Number of workers
-   * @return index of worker responsible for current partition
-   */
-  protected abstract int getWorker(
-      int partition, int partitionCount, int workerCount);
-
-  /**
-   * Utility function for calculating in which partition value
-   * from interval [0, max) should belong to.
-   *
-   * @param value Value for which partition is requested
-   * @param max Maximum possible value
-   * @param partitions Number of partitions, equally sized.
-   * @return Index of partition where value belongs to.
-   */
-  public static int getPartitionInRange(int value, int max, int partitions) {
-    double keyRange = ((double) max) / partitions;
-    int part = (int) ((value % max) / keyRange);
-    return Math.max(0, Math.min(partitions - 1, part));
-  }
-
-  /**
-   * Utility function for calculating in which partition value
-   * from interval [0, max) should belong to.
-   *
-   * @param value Value for which partition is requested
-   * @param max Maximum possible value
-   * @param partitions Number of partitions, equally sized.
-   * @return Index of partition where value belongs to.
-   */
-  public static int getPartitionInRange(long value, long max, int partitions) {
-    double keyRange = ((double) max) / partitions;
-    int part = (int) ((value % max) / keyRange);
-    return Math.max(0, Math.min(partitions - 1, part));
-  }
-}


Mime
View raw message