giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to cdb49fd
Date Wed, 08 Jan 2014 20:04:42 GMT
Updated Branches:
  refs/heads/trunk 4ce0f6a0d -> cdb49fd5f


GIRAPH-785: Improve GraphPartitionerFactory usage (ikabiljo via majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/cdb49fd5
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/cdb49fd5
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/cdb49fd5

Branch: refs/heads/trunk
Commit: cdb49fd5f76027a4a66930e6b266ba79266c9852
Parents: 4ce0f6a
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Wed Jan 8 12:04:23 2014 -0800
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Wed Jan 8 12:04:23 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../apache/giraph/partition/PartitionUtils.java |  15 ++-
 .../partition/RangeMasterPartitioner.java       |  42 -------
 .../giraph/partition/RangePartitionOwner.java   |  90 --------------
 .../giraph/partition/RangePartitionStats.java   |  68 -----------
 .../partition/RangePartitionerFactory.java      |  42 -------
 .../apache/giraph/partition/RangeSplitHint.java |  72 -----------
 .../partition/RangeWorkerPartitioner.java       |  77 ------------
 .../SimpleIntRangePartitionerFactory.java       |  45 +++----
 .../SimpleLongRangePartitionerFactory.java      |  43 +++----
 .../partition/SimpleMasterPartitioner.java      | 106 ++++++++++++++++
 .../partition/SimplePartitionerFactory.java     | 122 +++++++++++++++++++
 .../partition/SimpleRangeMasterPartitioner.java | 116 ------------------
 .../partition/SimpleRangeWorkerPartitioner.java | 107 ----------------
 .../partition/SimpleWorkerPartitioner.java      |  85 +++++++++++++
 .../SimpleRangePartitionFactoryTest.java        | 117 ++++++++++++++++++
 16 files changed, 473 insertions(+), 676 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d0259e3..b29aa66 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-785: Improve GraphPartitionerFactory usage (ikabiljo via majakabiljo)
+
   GIRAPH-815: Exclude dependency and duplicate finder checks to profile we do not check (aching)
 
   GIRAPH-798: Upgrade Giraph to Java7 and fix all dependencies (aching)

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index b055f4d..68bc2de 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -195,11 +195,20 @@ public class PartitionUtils {
     }
     int maxPartitions = getMaxPartitions(conf);
     if (partitionCount > maxPartitions) {
+      // try to keep partitionCount divisible by number of workers
+      // in order to keep the balance
+      int reducedPartitions = (maxPartitions / availableWorkerInfos.size()) *
+          availableWorkerInfos.size();
+      if (reducedPartitions == 0) {
+        reducedPartitions = maxPartitions;
+      }
       LOG.warn("computePartitionCount: " +
-          "Reducing the partitionCount to " + maxPartitions +
-          " from " + partitionCount);
-      partitionCount = maxPartitions;
+          "Reducing the partitionCount to " + reducedPartitions +
+          " from " + partitionCount + " because of " + maxPartitions +
+          " limit");
+      partitionCount = reducedPartitions;
     }
+
     return partitionCount;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
deleted file mode 100644
index 3911a95..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Some functionality is provided, but this is meant for developers to
- * determine the partitioning based on the actual types of data.  The
- * implementation of several methods are left to the developer who is trying
- * to control the amount of messages sent from one worker to another.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class RangeMasterPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable> implements
-    MasterGraphPartitioner<I, V, E> {
-  @Override
-  public PartitionStats createPartitionStats() {
-    return new RangePartitionStats<I>();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
deleted file mode 100644
index e7e03dc..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Added the max key index in to the {@link PartitionOwner}.  Also can provide
- * a split hint if desired.
- *
- * @param <I> Vertex index type
- */
-@SuppressWarnings("rawtypes")
-public class RangePartitionOwner<I extends WritableComparable>
-    extends BasicPartitionOwner {
-  /** Max index for this partition */
-  private I maxIndex;
-
-  /**
-   * Default constructor.
-   */
-  public RangePartitionOwner() { }
-
-  /**
-   * Constructor with the max index.
-   *
-   * @param maxIndex Max index of this partition.
-   */
-  public RangePartitionOwner(I maxIndex) {
-    this.maxIndex = maxIndex;
-  }
-
-  /**
-   * Get the maximum index of this partition owner.
-   *
-   * @return Maximum index.
-   */
-  public I getMaxIndex() {
-    return maxIndex;
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    super.readFields(input);
-    maxIndex = (I) getConf().createVertexId();
-    maxIndex.readFields(input);
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    super.write(output);
-    maxIndex.write(output);
-  }
-
-  @Override
-  public void writeWithWorkerIds(DataOutput output) throws IOException {
-    super.writeWithWorkerIds(output);
-    maxIndex.write(output);
-  }
-
-  @Override
-  public void readFieldsWithWorkerIds(DataInput input,
-      Map<Integer, WorkerInfo> workerInfoMap) throws IOException {
-    super.readFieldsWithWorkerIds(input, workerInfoMap);
-    maxIndex = (I) getConf().createVertexId();
-    maxIndex.readFields(input);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java
deleted file mode 100644
index 73af816..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Same as {@link PartitionStats}, but also includes the hint for range-based
- * partitioning.
- *
- * @param <I> Vertex index type
- */
-@SuppressWarnings("rawtypes")
-public class RangePartitionStats<I extends WritableComparable>
-    extends PartitionStats {
-  /** Can be null if no hint, otherwise a splitting hint */
-  private RangeSplitHint<I> hint;
-
-  /**
-   * Get the range split hint (if any)
-   *
-   * @return Hint of how to split the range if desired, null otherwise
-   */
-  public RangeSplitHint<I> getRangeSplitHint() {
-    return hint;
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    super.readFields(input);
-    boolean hintExists = input.readBoolean();
-    if (hintExists) {
-      hint = new RangeSplitHint<I>();
-      hint.readFields(input);
-    } else {
-      hint = null;
-    }
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    super.write(output);
-    output.writeBoolean(hint != null);
-    if (hint != null) {
-      hint.write(output);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
deleted file mode 100644
index 2ec4d4a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Range partitioning will split the vertices by a key range based on a generic
- * type.  This allows vertices that have some locality with the vertex ids
- * to reduce the amount of messages sent.  The tradeoffs are that
- * range partitioning is more susceptible to hot spots if the keys
- * are not randomly distributed.  Another negative is the user must implement
- * some of the functionality around how to split the key range.
- *
- * See {@link RangeWorkerPartitioner}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class RangePartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements GraphPartitionerFactory<I, V, E> {
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
deleted file mode 100644
index 4317944..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Hint to the {@link RangeMasterPartitioner} about how a
- * {@link RangePartitionOwner} can be split.
- *
- * @param <I> Vertex index to split around
- */
-@SuppressWarnings("rawtypes")
-public class RangeSplitHint<I extends WritableComparable>
-    implements Writable, ImmutableClassesGiraphConfigurable {
-  /** Hinted split index */
-  private I splitIndex;
-  /** Number of vertices in this range before the split */
-  private long preSplitVertexCount;
-  /** Number of vertices in this range after the split */
-  private long postSplitVertexCount;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    splitIndex = conf.createVertexId();
-    splitIndex.readFields(input);
-    preSplitVertexCount = input.readLong();
-    postSplitVertexCount = input.readLong();
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    splitIndex.write(output);
-    output.writeLong(preSplitVertexCount);
-    output.writeLong(postSplitVertexCount);
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
deleted file mode 100644
index cbcd753..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.util.Collection;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Range partitioning will split the vertices by a key range based on a generic
- * type.  This allows vertices that have some locality with the vertex ids
- * to reduce the amount of messages sent.  The tradeoffs are that
- * range partitioning is more susceptible to hot spots if the keys
- * are not randomly distributed.  Another negative is the user must implement
- * some of the functionality around how to split the key range.
- *
- * Note:  This implementation is incomplete, the developer must implement the
- * various methods based on their index type.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class RangeWorkerPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable> implements
-    WorkerGraphPartitioner<I, V, E> {
-  /** Mapping of the vertex ids to the {@link PartitionOwner} */
-  protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
-      new TreeMap<I, RangePartitionOwner<I>>();
-
-  @Override
-  public PartitionOwner createPartitionOwner() {
-    return new RangePartitionOwner<I>();
-  }
-
-  @Override
-  public PartitionOwner getPartitionOwner(I vertexId) {
-    // Find the partition owner based on the maximum partition id.
-    // If the vertex id exceeds any of the maximum partition ids, give
-    // it to the last one
-    if (vertexId == null) {
-      throw new IllegalArgumentException(
-          "getPartitionOwner: Illegal null vertex id");
-    }
-    I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
-    if (maxVertexIndex == null) {
-      return vertexRangeMap.lastEntry().getValue();
-    } else {
-      return vertexRangeMap.get(vertexId);
-    }
-  }
-
-  @Override
-  public Collection<? extends PartitionOwner> getPartitionOwners() {
-    return vertexRangeMap.values();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
index 7aee84c..8ab692f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
@@ -31,47 +31,32 @@ import org.apache.hadoop.io.Writable;
  * @param <V> Vertex value type
  * @param <E> Edge value type
  */
-public class SimpleIntRangePartitionerFactory<V extends Writable,
-    E extends Writable>
-    implements GraphPartitionerFactory<IntWritable, V, E> {
-  /** Configuration. */
-  private ImmutableClassesGiraphConfiguration conf;
+public class SimpleIntRangePartitionerFactory
+    <V extends Writable, E extends Writable>
+    extends SimplePartitionerFactory<IntWritable, V, E> {
+
   /** Vertex key space size. */
-  private long keySpaceSize;
+  private int keySpaceSize;
 
   @Override
-  public MasterGraphPartitioner<IntWritable, V, E>
-  createMasterGraphPartitioner() {
-    return new SimpleRangeMasterPartitioner<IntWritable, V, E>(conf);
+  protected int getPartition(IntWritable id, int partitionCount) {
+    return getPartitionInRange(id.get(), keySpaceSize, partitionCount);
   }
 
   @Override
-  public WorkerGraphPartitioner<IntWritable, V, E>
-  createWorkerGraphPartitioner() {
-    return new SimpleRangeWorkerPartitioner<IntWritable, V, E>(
-        keySpaceSize) {
-      @Override
-      protected long vertexKeyFromId(IntWritable id) {
-        // The modulo is just a safeguard in case keySpaceSize is incorrect.
-        return id.get() % keySpaceSize;
-      }
-    };
+  protected int getWorker(int partition, int partitionCount, int workerCount) {
+    return getPartitionInRange(partition, partitionCount, workerCount);
   }
 
   @Override
   public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-    keySpaceSize = conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE,
-        -1);
+    super.setConf(conf);
+    keySpaceSize =
+        conf.getInt(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, -1);
     if (keySpaceSize == -1) {
-      throw new IllegalStateException("Need to specify " + GiraphConstants
-          .PARTITION_VERTEX_KEY_SPACE_SIZE + " when using " +
-          "SimpleRangePartitioner");
+      throw new IllegalStateException("Need to specify " +
+          GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE +
+          " when using SimpleIntRangePartitionerFactory");
     }
   }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
index 64efde9..2989598 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
@@ -31,47 +31,32 @@ import org.apache.hadoop.io.Writable;
  * @param <V> Vertex value type
  * @param <E> Edge value type
  */
-public class SimpleLongRangePartitionerFactory<V extends Writable,
-    E extends Writable>
-    implements GraphPartitionerFactory<LongWritable, V, E> {
-  /** Configuration. */
-  private ImmutableClassesGiraphConfiguration conf;
+public class SimpleLongRangePartitionerFactory
+    <V extends Writable, E extends Writable>
+    extends SimplePartitionerFactory<LongWritable, V, E> {
+
   /** Vertex key space size. */
   private long keySpaceSize;
 
   @Override
-  public MasterGraphPartitioner<LongWritable, V, E>
-  createMasterGraphPartitioner() {
-    return new SimpleRangeMasterPartitioner<LongWritable, V, E>(conf);
+  protected int getPartition(LongWritable id, int partitionCount) {
+    return getPartitionInRange(id.get(), keySpaceSize, partitionCount);
   }
 
   @Override
-  public WorkerGraphPartitioner<LongWritable, V, E>
-  createWorkerGraphPartitioner() {
-    return new SimpleRangeWorkerPartitioner<LongWritable, V, E>(
-        keySpaceSize) {
-      @Override
-      protected long vertexKeyFromId(LongWritable id) {
-        // The modulo is just a safeguard in case keySpaceSize is incorrect.
-        return id.get() % keySpaceSize;
-      }
-    };
+  protected int getWorker(int partition, int partitionCount, int workerCount) {
+    return getPartitionInRange(partition, partitionCount, workerCount);
   }
 
   @Override
   public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-    keySpaceSize = conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE,
-        -1);
+    super.setConf(conf);
+    keySpaceSize =
+          conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, -1);
     if (keySpaceSize == -1) {
-      throw new IllegalStateException("Need to specify " + GiraphConstants
-          .PARTITION_VERTEX_KEY_SPACE_SIZE + " when using " +
-          "SimpleRangePartitioner");
+      throw new IllegalStateException("Need to specify " +
+          GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE +
+          " when using SimpleLongRangePartitionerFactory");
     }
   }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
new file mode 100644
index 0000000..f128f34
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Abstracts and implements all MasterGraphPartitioner logic on top of a single
+ * user function - getWorkerIndex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public abstract class SimpleMasterPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements MasterGraphPartitioner<I, V, E> {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
+  /** Provided configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+  /** Save the last generated partition owner list */
+  private List<PartitionOwner> partitionOwnerList;
+
+  /**
+   * Constructor.
+   *
+   * @param conf
+   *          Configuration used.
+   */
+  public SimpleMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Collection<PartitionOwner> createInitialPartitionOwners(
+      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
+    int partitionCount = PartitionUtils.computePartitionCount(
+        availableWorkerInfos, maxWorkers, conf);
+    ArrayList<WorkerInfo> workerList =
+        new ArrayList<WorkerInfo>(availableWorkerInfos);
+
+    partitionOwnerList = new ArrayList<PartitionOwner>();
+    for (int i = 0; i < partitionCount; i++) {
+      partitionOwnerList.add(new BasicPartitionOwner(i, workerList.get(
+          getWorkerIndex(i, partitionCount, workerList.size()))));
+    }
+
+    return partitionOwnerList;
+  }
+
+  @Override
+  public Collection<PartitionOwner> generateChangedPartitionOwners(
+      Collection<PartitionStats> allPartitionStatsList,
+      Collection<WorkerInfo> availableWorkers,
+      int maxWorkers,
+      long superstep) {
+    return PartitionBalancer.balancePartitionsAcrossWorkers(conf,
+        partitionOwnerList, allPartitionStatsList, availableWorkers);
+  }
+
+  @Override
+  public Collection<PartitionOwner> getCurrentPartitionOwners() {
+    return partitionOwnerList;
+  }
+
+  @Override
+  public PartitionStats createPartitionStats() {
+    return new PartitionStats();
+  }
+
+  /**
+   * Calculates worker that should be responsible for passed partition.
+   *
+   * @param partition Current partition
+   * @param partitionCount Number of partitions
+   * @param workerCount Number of workers
+   * @return index of worker responsible for current partition
+   */
+  protected abstract int getWorkerIndex(
+      int partition, int partitionCount, int workerCount);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
new file mode 100644
index 0000000..15b0756
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Abstracts and implements all GraphPartitionerFactory logic on top of two
+ * functions which define partitioning scheme:
+ * - which partition user should be in, and
+ * - which partition should belong to which worker
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public abstract class SimplePartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements GraphPartitionerFactory<I, V, E> {
+  /** Configuration. */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  @Override
+  public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
+    return new SimpleMasterPartitioner<I, V, E>(conf) {
+      @Override
+      protected int getWorkerIndex(int partition, int partitionCount,
+          int workerCount) {
+        return SimplePartitionerFactory.this.getWorker(
+            partition, partitionCount, workerCount);
+      }
+    };
+  }
+
+  @Override
+  public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
+    return new SimpleWorkerPartitioner<I, V, E>() {
+      @Override
+      protected int getPartitionIndex(I id, int partitionCount) {
+        return SimplePartitionerFactory.this.getPartition(id, partitionCount);
+      }
+    };
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public final ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  /**
+   * Calculates in which partition current vertex belongs to,
+   * from interval [0, partitionCount).
+   *
+   * @param id Vertex id
+   * @param partitionCount Number of partitions
+   * @return partition
+   */
+  protected abstract int getPartition(I id, int partitionCount);
+  /**
+   * Calculates worker that should be responsible for passed partition.
+   *
+   * @param partition Current partition
+   * @param partitionCount Number of partitions
+   * @param workerCount Number of workers
+   * @return index of worker responsible for current partition
+   */
+  protected abstract int getWorker(
+      int partition, int partitionCount, int workerCount);
+
+  /**
+   * Utility function for calculating in which partition value
+   * from interval [0, max) should belong to.
+   *
+   * @param value Value for which partition is requested
+   * @param max Maximum possible value
+   * @param partitions Number of partitions, equally sized.
+   * @return Index of partition where value belongs to.
+   */
+  public static int getPartitionInRange(int value, int max, int partitions) {
+    double keyRange = ((double) max) / partitions;
+    int part = (int) ((value % max) / keyRange);
+    return Math.max(0, Math.min(partitions - 1, part));
+  }
+
+  /**
+   * Utility function for calculating in which partition value
+   * from interval [0, max) should belong to.
+   *
+   * @param value Value for which partition is requested
+   * @param max Maximum possible value
+   * @param partitions Number of partitions, equally sized.
+   * @return Index of partition where value belongs to.
+   */
+  public static int getPartitionInRange(long value, long max, int partitions) {
+    double keyRange = ((double) max) / partitions;
+    int part = (int) ((value % max) / keyRange);
+    return Math.max(0, Math.min(partitions - 1, part));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
deleted file mode 100644
index 37ce8c7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * A range-based master partitioner where equal-sized ranges of partitions
- * are deterministically assigned to workers.
- *
- * @param <I> Vertex id type
- * @param <V> Vertex value type
- * @param <E> Edge value type
- */
-public class SimpleRangeMasterPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable> implements
-    MasterGraphPartitioner<I, V, E> {
-  /** Class logger */
-  private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
-  /** Provided configuration */
-  private ImmutableClassesGiraphConfiguration conf;
-  /** Save the last generated partition owner list */
-  private List<PartitionOwner> partitionOwnerList;
-
-  /**
-   * Constructor.
-   *
-   * @param conf Configuration used.
-   */
-  public SimpleRangeMasterPartitioner(
-      ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Collection<PartitionOwner> createInitialPartitionOwners(
-      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
-    int partitionCount = PartitionUtils.computePartitionCount(
-        availableWorkerInfos, maxWorkers, conf);
-    int rangeSize = partitionCount / availableWorkerInfos.size();
-
-    partitionOwnerList = new ArrayList<PartitionOwner>();
-    Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
-    WorkerInfo currentWorker = null;
-
-    int i = 0;
-    for (; i < partitionCount; ++i) {
-      if (i % rangeSize == 0) {
-        if (!workerIt.hasNext()) {
-          break;
-        }
-        currentWorker = workerIt.next();
-      }
-      partitionOwnerList.add(new BasicPartitionOwner(i, currentWorker));
-    }
-
-    // Distribute the remainder among all workers.
-    if (i < partitionCount) {
-      workerIt = availableWorkerInfos.iterator();
-      for (; i < partitionCount; ++i) {
-        partitionOwnerList.add(new BasicPartitionOwner(i, workerIt.next()));
-      }
-    }
-
-    return partitionOwnerList;
-  }
-
-  @Override
-  public Collection<PartitionOwner> generateChangedPartitionOwners(
-      Collection<PartitionStats> allPartitionStatsList,
-      Collection<WorkerInfo> availableWorkers,
-      int maxWorkers,
-      long superstep) {
-    return PartitionBalancer.balancePartitionsAcrossWorkers(
-        conf,
-        partitionOwnerList,
-        allPartitionStatsList,
-        availableWorkers);
-  }
-
-  @Override
-  public Collection<PartitionOwner> getCurrentPartitionOwners() {
-    return partitionOwnerList;
-  }
-
-  @Override
-  public PartitionStats createPartitionStats() {
-    return new PartitionStats();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
deleted file mode 100644
index ab2afd5..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A range-based worker partitioner where equal-sized ranges of vertex ids
- * are deterministically assigned to partitions.
- * The user has to define a mapping from vertex ids to long keys dense in
- * [0, keySpaceSize).
- *
- * @param <I> Vertex id type
- * @param <V> Vertex value type
- * @param <E> Edge value type
- */
-public abstract class SimpleRangeWorkerPartitioner<I extends
-    WritableComparable, V extends Writable, E extends Writable>
-    implements WorkerGraphPartitioner<I, V, E> {
-  /** List of {@link PartitionOwner}s for this worker. */
-  private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
-  /** Vertex keys space size. */
-  private long keySpaceSize;
-
-  /**
-   * Constructor.
-   *
-   * @param keySpaceSize Vertex keys space size.
-   */
-  public SimpleRangeWorkerPartitioner(long keySpaceSize) {
-    this.keySpaceSize = keySpaceSize;
-  }
-
-  /**
-   * Get key space size (can be used when implementing vertexKeyFromId()).
-   *
-   * @return Key space size.
-   */
-  public long getKeySpaceSize() {
-    return keySpaceSize;
-  }
-
-  /**
-   * Convert a vertex id to a unique long key in [0, keySpaceSize].
-   *
-   * @param id Vertex id
-   * @return Unique long key
-   */
-  protected abstract long vertexKeyFromId(I id);
-
-  @Override
-  public PartitionOwner createPartitionOwner() {
-    return new BasicPartitionOwner();
-  }
-
-  @Override
-  public PartitionOwner getPartitionOwner(I vertexId) {
-    long rangeSize = keySpaceSize / partitionOwnerList.size();
-    return partitionOwnerList.get(
-        Math.min((int) (vertexKeyFromId(vertexId) / rangeSize),
-            partitionOwnerList.size() - 1));
-  }
-
-  @Override
-  public Collection<PartitionStats> finalizePartitionStats(
-      Collection<PartitionStats> workerPartitionStats,
-      PartitionStore<I, V, E> partitionStore) {
-    // No modification necessary
-    return workerPartitionStats;
-  }
-
-  @Override
-  public PartitionExchange updatePartitionOwners(
-      WorkerInfo myWorkerInfo,
-      Collection<? extends PartitionOwner> masterSetPartitionOwners,
-      PartitionStore<I, V, E> partitionStore) {
-    return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
-        myWorkerInfo, masterSetPartitionOwners, partitionStore);
-  }
-
-  @Override
-  public Collection<? extends PartitionOwner> getPartitionOwners() {
-    return partitionOwnerList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
new file mode 100644
index 0000000..600d7a3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Abstracts and implements all WorkerGraphPartitioner logic on top of a single
+ * user function - getPartitionIndex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements WorkerGraphPartitioner<I, V, E> {
+  /** List of {@link PartitionOwner}s for this worker. */
+  private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
+
+  @Override
+  public PartitionOwner createPartitionOwner() {
+    return new BasicPartitionOwner();
+  }
+
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    return partitionOwnerList.get(
+        getPartitionIndex(vertexId, partitionOwnerList.size()));
+  }
+
+  @Override
+  public Collection<PartitionStats> finalizePartitionStats(
+      Collection<PartitionStats> workerPartitionStats,
+      PartitionStore<I, V, E> partitionStore) {
+    // No modification necessary
+    return workerPartitionStats;
+  }
+
+  @Override
+  public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
+      Collection<? extends PartitionOwner> masterSetPartitionOwners,
+      PartitionStore<I, V, E> partitionStore) {
+    return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
+        myWorkerInfo, masterSetPartitionOwners, partitionStore);
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> getPartitionOwners() {
+    return partitionOwnerList;
+  }
+
+  /**
+   * Calculates in which partition current vertex belongs to,
+   * from interval [0, partitionCount).
+   *
+   * @param id Vertex id
+   * @param partitionCount Number of partitions
+   * @return partition
+   */
+  protected abstract int getPartitionIndex(I id, int partitionCount);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
new file mode 100644
index 0000000..4e19cd2
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/** Test {@link org.apache.giraph.partition.SimpleLongRangePartitionerFactory}. */
+public class SimpleRangePartitionFactoryTest {
+
+  private void testRange(int numWorkers, int keySpaceSize, int allowedWorkerDiff, boolean emptyWorkers) {
+    Configuration conf = new Configuration();
+    conf.setLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, keySpaceSize);
+    SimpleLongRangePartitionerFactory<Writable, Writable> factory =
+        new SimpleLongRangePartitionerFactory<Writable, Writable>();
+    factory.setConf(new ImmutableClassesGiraphConfiguration(conf));
+
+    ArrayList<WorkerInfo> infos = new ArrayList<WorkerInfo>();
+    for (int i = 0; i < numWorkers; i++) {
+      WorkerInfo info = new WorkerInfo();
+      info.setTaskId(i);
+      infos.add(info);
+    }
+
+    Collection<PartitionOwner> owners =
+        factory.createMasterGraphPartitioner().createInitialPartitionOwners(infos, -1);
+
+    int[] tasks = new int[owners.size()];
+    for (PartitionOwner owner : owners) {
+      WorkerInfo worker = owner.getWorkerInfo();
+      assertEquals(0, tasks[owner.getPartitionId()]);
+      tasks[owner.getPartitionId()] = worker.getTaskId() + 1;
+    }
+    checkMapping(tasks, allowedWorkerDiff, emptyWorkers);
+
+    WorkerGraphPartitioner<LongWritable, Writable, Writable> workerPartitioner =
+        factory.createWorkerGraphPartitioner();
+    workerPartitioner.updatePartitionOwners(null, owners, null);
+    LongWritable longWritable = new LongWritable();
+
+    int[] partitions = new int[keySpaceSize];
+    for (int i = 0; i < keySpaceSize; i++) {
+      longWritable.set(i);
+      PartitionOwner owner = workerPartitioner.getPartitionOwner(longWritable);
+      partitions[i] = owner.getPartitionId();
+    }
+    checkMapping(partitions, 1, emptyWorkers);
+  }
+
+  private void checkMapping(int[] mapping, int allowedDiff, boolean emptyWorkers) {
+    int prev = -1;
+
+    int max = 0;
+    int min = Integer.MAX_VALUE;
+    int cur = 0;
+    for (int value : mapping) {
+      if (value != prev) {
+        if (prev != -1) {
+          min = Math.min(cur, min);
+          max = Math.max(cur, max);
+          assertTrue(prev < value);
+          if (!emptyWorkers) {
+            assertEquals(prev + 1, value);
+          }
+        }
+        cur = 1;
+      } else {
+        cur++;
+      }
+      prev = value;
+    }
+    assertTrue(min + allowedDiff >= max);
+  }
+
+  @Test
+  public void testLongRangePartitionerFactory() {
+    // perfect distribution
+    testRange(10, 100000, 0, false);
+    testRange(1000, 100000, 0, false);
+
+    // perfect distribution even when max is hit, and max is not divisible by #workers
+    testRange(8949, 100023, 0, false);
+    testRange(1949, 211111, 0, false);
+
+    // imperfect distribution - because there are more workers than max partitions.
+    testRange(194942, 211111, 1, true);
+  }
+}


Mime
View raw message