giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [7/23] GIRAPH-409: Refactor / cleanups (nitay)
Date Fri, 04 Jan 2013 20:52:39 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
new file mode 100644
index 0000000..bdbd467
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+/**
+ * Helper class for balancing partitions across a set of workers.
+ */
+public class PartitionBalancer {
+  /** Partition balancing algorithm */
+  public static final String PARTITION_BALANCE_ALGORITHM =
+    "hash.partitionBalanceAlgorithm";
+  /** No rebalancing during the supersteps */
+  public static final String STATIC_BALANCE_ALGORITHM =
+    "static";
+  /** Rebalance across supersteps by edges */
+  public static final String EGDE_BALANCE_ALGORITHM =
+    "edges";
+  /** Rebalance across supersteps by vertices */
+  public static final String VERTICES_BALANCE_ALGORITHM =
+    "vertices";
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
+
+  /**
+   * What value to balance partitions with?  Edges, vertices?
+   */
+  private enum BalanceValue {
+    /** Not chosen */
+    UNSET,
+    /** Balance with edges */
+    EDGES,
+    /** Balance with vertices */
+    VERTICES
+  }
+
+  /**
+   * Do not construct this class.
+   */
+  private PartitionBalancer() { }
+
+  /**
+   * Get the value used to balance.
+   *
+   * @param partitionStat Stats of this partition.
+   * @param balanceValue Type of the value to balance.
+   * @return Balance value.
+   */
+  private static long getBalanceValue(PartitionStats partitionStat,
+      BalanceValue balanceValue) {
+    switch (balanceValue) {
+    case EDGES:
+      return partitionStat.getEdgeCount();
+    case VERTICES:
+      return partitionStat.getVertexCount();
+    default:
+      throw new IllegalArgumentException(
+          "getBalanceValue: Illegal balance value " + balanceValue);
+    }
+  }
+
+  /**
+   * Used to sort the partition owners from lowest value to highest value
+   */
+  private static class PartitionOwnerComparator implements
+      Comparator<PartitionOwner> {
+    /** Map of owner to stats */
+    private final Map<PartitionOwner, PartitionStats> ownerStatMap;
+    /** Value type to compare on */
+    private final BalanceValue balanceValue;
+
+
+    /**
+     * Only constructor.
+     *
+     * @param ownerStatMap Map of owners to stats.
+     * @param balanceValue Value to balance with.
+     */
+    public PartitionOwnerComparator(
+        Map<PartitionOwner, PartitionStats> ownerStatMap,
+        BalanceValue balanceValue) {
+      this.ownerStatMap = ownerStatMap;
+      this.balanceValue = balanceValue;
+    }
+
+    @Override
+    public int compare(PartitionOwner owner1, PartitionOwner owner2) {
+      return (int)
+          (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
+              getBalanceValue(ownerStatMap.get(owner2), balanceValue));
+    }
+  }
+
+  /**
+   * Structure to keep track of how much value a {@link WorkerInfo} has
+   * been assigned.
+   */
+  private static class WorkerInfoAssignments implements
+      Comparable<WorkerInfoAssignments> {
+    /** Worker info associated */
+    private final WorkerInfo workerInfo;
+    /** Balance value */
+    private final BalanceValue balanceValue;
+    /** Map of owner to stats */
+    private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
+    /** Current value of this object */
+    private long value = 0;
+
+    /**
+     * Constructor with final values.
+     *
+     * @param workerInfo Worker info for assignment.
+     * @param balanceValue Value used to balance.
+     * @param ownerStatsMap Map of owner to stats.
+     */
+    public WorkerInfoAssignments(
+        WorkerInfo workerInfo,
+        BalanceValue balanceValue,
+        Map<PartitionOwner, PartitionStats> ownerStatsMap) {
+      this.workerInfo = workerInfo;
+      this.balanceValue = balanceValue;
+      this.ownerStatsMap = ownerStatsMap;
+    }
+
+    /**
+     * Get the total value of all partitions assigned to this worker.
+     *
+     * @return Total value of all partition assignments.
+     */
+    public long getValue() {
+      return value;
+    }
+
+    /**
+     * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
+     *
+     * @param partitionOwner PartitionOwner to assign.
+     */
+    public void assignPartitionOwner(
+        PartitionOwner partitionOwner) {
+      value += getBalanceValue(ownerStatsMap.get(partitionOwner),
+          balanceValue);
+      if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
+        partitionOwner.setPreviousWorkerInfo(
+            partitionOwner.getWorkerInfo());
+        partitionOwner.setWorkerInfo(workerInfo);
+      } else {
+        partitionOwner.setPreviousWorkerInfo(null);
+      }
+    }
+
+    @Override
+    public int compareTo(WorkerInfoAssignments other) {
+      return (int)
+          (getValue() - ((WorkerInfoAssignments) other).getValue());
+    }
+  }
+
+  /**
+   * Balance the partitions with an algorithm based on a value.
+   *
+   * @param conf Configuration to find the algorithm
+   * @param partitionOwners All the owners of all partitions
+   * @param allPartitionStats All the partition stats
+   * @param availableWorkerInfos All the available workers
+   * @return Balanced partition owners
+   */
+  public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
+      Configuration conf,
+      Collection<PartitionOwner> partitionOwners,
+      Collection<PartitionStats> allPartitionStats,
+      Collection<WorkerInfo> availableWorkerInfos) {
+
+    String balanceAlgorithm =
+        conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
+          balanceAlgorithm);
+    }
+    BalanceValue balanceValue = BalanceValue.UNSET;
+    if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
+      return partitionOwners;
+    } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
+      balanceValue = BalanceValue.EDGES;
+    } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
+      balanceValue = BalanceValue.VERTICES;
+    } else {
+      throw new IllegalArgumentException(
+          "balancePartitionsAcrossWorkers: Illegal balance " +
+              "algorithm - " + balanceAlgorithm);
+    }
+
+    // Join the partition stats and partition owners by partition id
+    Map<Integer, PartitionStats> idStatMap =
+        new HashMap<Integer, PartitionStats>();
+    for (PartitionStats partitionStats : allPartitionStats) {
+      if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
+          null) {
+        throw new IllegalStateException(
+            "balancePartitionsAcrossWorkers: Duplicate partition id " +
+                "for " + partitionStats);
+      }
+    }
+    Map<PartitionOwner, PartitionStats> ownerStatsMap =
+        new HashMap<PartitionOwner, PartitionStats>();
+    for (PartitionOwner partitionOwner : partitionOwners) {
+      PartitionStats partitionStats =
+          idStatMap.get(partitionOwner.getPartitionId());
+      if (partitionStats == null) {
+        throw new IllegalStateException(
+            "balancePartitionsAcrossWorkers: Missing partition " +
+                "stats for " + partitionOwner);
+      }
+      if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
+        throw new IllegalStateException(
+            "balancePartitionsAcrossWorkers: Duplicate partition " +
+                "owner " + partitionOwner);
+      }
+    }
+    if (ownerStatsMap.size() != partitionOwners.size()) {
+      throw new IllegalStateException(
+          "balancePartitionsAcrossWorkers: ownerStats count = " +
+              ownerStatsMap.size() + ", partitionOwners count = " +
+              partitionOwners.size() + " and should match.");
+    }
+
+    List<WorkerInfoAssignments> workerInfoAssignmentsList =
+        new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
+    for (WorkerInfo workerInfo : availableWorkerInfos) {
+      workerInfoAssignmentsList.add(
+          new WorkerInfoAssignments(
+              workerInfo, balanceValue, ownerStatsMap));
+    }
+
+    // A simple heuristic for balancing the partitions across the workers
+    // using a value (edges, vertices).  An improvement would be to
+    // take into account the already existing partition worker assignments.
+    // 1.  Sort the partitions by size
+    // 2.  Place the workers in a min heap sorted by their total balance
+    //     value.
+    // 3.  From largest partition to the smallest, take the partition
+    //     worker at the top of the heap, add the partition to it, and
+    //     then put it back in the heap
+    List<PartitionOwner> partitionOwnerList =
+        new ArrayList<PartitionOwner>(partitionOwners);
+    Collections.sort(partitionOwnerList,
+        Collections.reverseOrder(
+            new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
+    PriorityQueue<WorkerInfoAssignments> minQueue =
+        new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
+    for (PartitionOwner partitionOwner : partitionOwnerList) {
+      WorkerInfoAssignments chosenWorker = minQueue.remove();
+      chosenWorker.assignPartitionOwner(partitionOwner);
+      minQueue.add(chosenWorker);
+    }
+
+    return partitionOwnerList;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/PartitionExchange.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionExchange.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionExchange.java
new file mode 100644
index 0000000..2ea62b8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionExchange.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.giraph.worker.WorkerInfo;
+
+/**
+ * Describes what is required to send and wait for in a potential partition
+ * exchange between workers.
+ */
+public class PartitionExchange {
+  /** Workers that I am dependent on before I can continue */
+  private final Set<WorkerInfo> myDependencyWorkerSet;
+  /** Workers that I need to sent partitions to */
+  private final Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap;
+
+  /**
+   * Only constructor.
+   *
+   * @param myDependencyWorkerSet All the workers I must wait for
+   * @param sendWorkerPartitionMap Partitions I need to send to other workers
+   */
+  public PartitionExchange(
+      Set<WorkerInfo> myDependencyWorkerSet,
+      Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap) {
+    this.myDependencyWorkerSet = myDependencyWorkerSet;
+    this.sendWorkerPartitionMap = sendWorkerPartitionMap;
+  }
+
+  /**
+   * Get the workers that I must wait for
+   *
+   * @return Set of workers I must wait for
+   */
+  public Set<WorkerInfo> getMyDependencyWorkerSet() {
+    return myDependencyWorkerSet;
+  }
+
+  /**
+   * Get a mapping of worker to list of partition ids I need to send to.
+   *
+   * @return Mapping of worker to partition id list I will send to.
+   */
+  public Map<WorkerInfo, List<Integer>> getSendWorkerPartitionMap() {
+    return sendWorkerPartitionMap;
+  }
+
+  /**
+   * Is this worker involved in a partition exchange?  Receiving or sending?
+   *
+   * @return True if needs to be involved in the exchange, false otherwise.
+   */
+  public boolean doExchange() {
+    return !myDependencyWorkerSet.isEmpty() ||
+        !sendWorkerPartitionMap.isEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
new file mode 100644
index 0000000..a886d79
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Metadata about ownership of a partition.
+ */
+public interface PartitionOwner extends Writable {
+  /**
+   * Get the partition id that maps to the relevant {@link Partition} object
+   *
+   * @return Partition id
+   */
+  int getPartitionId();
+
+  /**
+   * Get the worker information that is currently responsible for
+   * the partition id.
+   *
+   * @return Owning worker information.
+   */
+  WorkerInfo getWorkerInfo();
+
+  /**
+   * Set the current worker info.
+   *
+   * @param workerInfo Worker info responsible for partition
+   */
+  void setWorkerInfo(WorkerInfo workerInfo);
+
+  /**
+   * Get the worker information that was previously responsible for the
+   * partition id.
+   *
+   * @return Owning worker information or null if no previous worker info.
+   */
+  WorkerInfo getPreviousWorkerInfo();
+
+  /**
+   * Set the previous worker info.
+   *
+   * @param workerInfo Worker info that was previously responsible for the
+   *        partition.
+   */
+  void setPreviousWorkerInfo(WorkerInfo workerInfo);
+
+  /**
+   * If this is a restarted checkpoint, the worker will use this information
+   * to determine where the checkpointed partition was stored on HDFS.
+   *
+   * @return Prefix of the checkpoint HDFS files for this partition, null if
+   *         this is not a restarted superstep.
+   */
+  String getCheckpointFilesPrefix();
+
+  /**
+   * Set the checkpoint files prefix.  Master uses this.
+   *
+   * @param checkpointFilesPrefix HDFS checkpoint file prefix
+   */
+  void setCheckpointFilesPrefix(String checkpointFilesPrefix);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
new file mode 100644
index 0000000..b8eeca9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Used to keep track of statistics of every {@link Partition}. Contains no
+ * actual partition data, only the statistics.
+ */
+public class PartitionStats implements Writable {
+  /** Id of partition to keep stats for */
+  private int partitionId = -1;
+  /** Vertices in this partition */
+  private long vertexCount = 0;
+  /** Finished vertices in this partition */
+  private long finishedVertexCount = 0;
+  /** Edges in this partition */
+  private long edgeCount = 0;
+  /** Messages sent from this partition */
+  private long messagesSentCount = 0;
+
+  /**
+   * Default constructor for reflection.
+   */
+  public PartitionStats() { }
+
+  /**
+   * Constructor with the initial stats.
+   *
+   * @param partitionId Partition count.
+   * @param vertexCount Vertex count.
+   * @param finishedVertexCount Finished vertex count.
+   * @param edgeCount Edge count.
+   * @param messagesSentCount Number of messages sent
+   */
+  public PartitionStats(int partitionId,
+      long vertexCount,
+      long finishedVertexCount,
+      long edgeCount,
+      long messagesSentCount) {
+    this.partitionId = partitionId;
+    this.vertexCount = vertexCount;
+    this.finishedVertexCount = finishedVertexCount;
+    this.edgeCount = edgeCount;
+    this.messagesSentCount = messagesSentCount;
+  }
+
+  /**
+   * Set the partition id.
+   *
+   * @param partitionId New partition id.
+   */
+  public void setPartitionId(int partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  /**
+   * Get partition id.
+   *
+   * @return Partition id.
+   */
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  /**
+   * Increment the vertex count by one.
+   */
+  public void incrVertexCount() {
+    ++vertexCount;
+  }
+
+  /**
+   * Get the vertex count.
+   *
+   * @return Vertex count.
+   */
+  public long getVertexCount() {
+    return vertexCount;
+  }
+
+  /**
+   * Increment the finished vertex count by one.
+   */
+  public void incrFinishedVertexCount() {
+    ++finishedVertexCount;
+  }
+
+  /**
+   * Get the finished vertex count.
+   *
+   * @return Finished vertex count.
+   */
+  public long getFinishedVertexCount() {
+    return finishedVertexCount;
+  }
+
+  /**
+   * Add edges to the edge count.
+   *
+   * @param edgeCount Number of edges to add.
+   */
+  public void addEdgeCount(long edgeCount) {
+    this.edgeCount += edgeCount;
+  }
+
+  /**
+   * Get the edge count.
+   *
+   * @return Edge count.
+   */
+  public long getEdgeCount() {
+    return edgeCount;
+  }
+
+  /**
+   * Add messages to the messages sent count.
+   *
+   * @param messagesSentCount Number of messages to add.
+   */
+  public void addMessagesSentCount(long messagesSentCount) {
+    this.messagesSentCount += messagesSentCount;
+  }
+
+  /**
+   * Get the messages sent count.
+   *
+   * @return Messages sent count.
+   */
+  public long getMessagesSentCount() {
+    return messagesSentCount;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    partitionId = input.readInt();
+    vertexCount = input.readLong();
+    finishedVertexCount = input.readLong();
+    edgeCount = input.readLong();
+    messagesSentCount = input.readLong();
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(partitionId);
+    output.writeLong(vertexCount);
+    output.writeLong(finishedVertexCount);
+    output.writeLong(edgeCount);
+    output.writeLong(messagesSentCount);
+  }
+
+  @Override
+  public String toString() {
+    return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
+        finishedVertexCount + ",edges=" + edgeCount + ",msgsSent=" +
+        messagesSentCount + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
new file mode 100644
index 0000000..3e8dda9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Structure that stores partitions for a worker.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class PartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+
+  /**
+   * Add a new partition to the store or just the vertices from the partition
+   * to the old partition.
+   *
+   * @param partition Partition to add
+   */
+  public abstract void addPartition(Partition<I, V, E, M> partition);
+
+  /**
+   * Get a partition.
+   *
+   * @param partitionId Partition id
+   * @return The requested partition
+   */
+  public abstract Partition<I, V, E, M> getPartition(Integer partitionId);
+
+  /**
+   * Remove a partition and return it.
+   *
+   * @param partitionId Partition id
+   * @return The removed partition
+   */
+  public abstract Partition<I, V, E, M> removePartition(Integer partitionId);
+
+  /**
+   * Just delete a partition
+   * (more efficient than {@link #removePartition(Integer partitionID)} if the
+   * partition is out of core).
+   *
+   * @param partitionId Partition id
+   */
+  public abstract void deletePartition(Integer partitionId);
+
+  /**
+   * Whether a specific partition is present in the store.
+   *
+   * @param partitionId Partition id
+   * @return True iff the partition is present
+   */
+  public abstract boolean hasPartition(Integer partitionId);
+
+  /**
+   * Return the ids of all the stored partitions as an Iterable.
+   *
+   * @return The partition ids
+   */
+  public abstract Iterable<Integer> getPartitionIds();
+
+  /**
+   * Return the number of stored partitions.
+   *
+   * @return The number of partitions
+   */
+  public abstract int getNumPartitions();
+
+  /**
+   * Whether the partition store is empty.
+   *
+   * @return True iff there are no partitions in the store
+   */
+  public boolean isEmpty() {
+    return getNumPartitions() == 0;
+  }
+
+  /**
+   * Return all the stored partitions as an Iterable.  Note that this may force
+   * out-of-core partitions to be loaded into memory if using out-of-core.
+   *
+   * @return The partition objects
+   */
+  public Iterable<Partition<I, V, E, M>> getPartitions() {
+    return Iterables.transform(getPartitionIds(),
+        new Function<Integer, Partition<I, V, E, M>>() {
+          @Override
+          public Partition<I, V, E, M> apply(Integer partitionId) {
+            return getPartition(partitionId);
+          }
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
new file mode 100644
index 0000000..e472ac6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Helper class for {@link Partition} related operations.
+ */
+public class PartitionUtils {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(PartitionUtils.class);
+
+  /**
+   * Do not construct this object.
+   */
+  private PartitionUtils() { }
+
+  /**
+   * Compare edge counts for Entry<WorkerInfo, VertexEdgeCount> objects.
+   */
+  private static class EdgeCountComparator implements
+      Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+    @Override
+    public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+        Entry<WorkerInfo, VertexEdgeCount> worker2) {
+      return (int) (worker1.getValue().getEdgeCount() -
+        worker2.getValue().getEdgeCount());
+    }
+  }
+
+  /**
+   * Compare vertex counts between a {@link WorkerInfo} and
+   * {@link VertexEdgeCount}.
+   */
+  private static class VertexCountComparator implements
+      Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+    @Override
+    public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+        Entry<WorkerInfo, VertexEdgeCount> worker2) {
+      return (int) (worker1.getValue().getVertexCount() -
+        worker2.getValue().getVertexCount());
+    }
+  }
+
+  /**
+   * Check for imbalances on a per worker basis, by calculating the
+   * mean, high and low workers by edges and vertices.
+   *
+   * @param partitionOwnerList List of partition owners.
+   * @param allPartitionStats All the partition stats.
+   */
+  public static void analyzePartitionStats(
+      Collection<PartitionOwner> partitionOwnerList,
+      List<PartitionStats> allPartitionStats) {
+    Map<Integer, PartitionOwner> idOwnerMap =
+        new HashMap<Integer, PartitionOwner>();
+    for (PartitionOwner partitionOwner : partitionOwnerList) {
+      if (idOwnerMap.put(partitionOwner.getPartitionId(),
+          partitionOwner) != null) {
+        throw new IllegalStateException(
+            "analyzePartitionStats: Duplicate partition " +
+                partitionOwner);
+      }
+    }
+
+    Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
+    VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
+    for (PartitionStats partitionStats : allPartitionStats) {
+      WorkerInfo workerInfo =
+          idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
+      VertexEdgeCount vertexEdgeCount =
+          workerStatsMap.get(workerInfo);
+      if (vertexEdgeCount == null) {
+        workerStatsMap.put(
+            workerInfo,
+            new VertexEdgeCount(partitionStats.getVertexCount(),
+                partitionStats.getEdgeCount()));
+      } else {
+        workerStatsMap.put(
+            workerInfo,
+            vertexEdgeCount.incrVertexEdgeCount(
+                partitionStats.getVertexCount(),
+                partitionStats.getEdgeCount()));
+      }
+      totalVertexEdgeCount =
+          totalVertexEdgeCount.incrVertexEdgeCount(
+              partitionStats.getVertexCount(),
+              partitionStats.getEdgeCount());
+    }
+
+    List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
+        Lists.newArrayList(workerStatsMap.entrySet());
+
+    if (LOG.isInfoEnabled()) {
+      Collections.sort(workerEntryList, new VertexCountComparator());
+      LOG.info("analyzePartitionStats: Vertices - Mean: " +
+          (totalVertexEdgeCount.getVertexCount() /
+              workerStatsMap.size()) +
+              ", Min: " +
+              workerEntryList.get(0).getKey() + " - " +
+              workerEntryList.get(0).getValue().getVertexCount() +
+              ", Max: " +
+              workerEntryList.get(workerEntryList.size() - 1).getKey() +
+              " - " +
+              workerEntryList.get(workerEntryList.size() - 1).
+              getValue().getVertexCount());
+      Collections.sort(workerEntryList, new EdgeCountComparator());
+      LOG.info("analyzePartitionStats: Edges - Mean: " +
+          (totalVertexEdgeCount.getEdgeCount() /
+              workerStatsMap.size()) +
+              ", Min: " +
+              workerEntryList.get(0).getKey() + " - " +
+              workerEntryList.get(0).getValue().getEdgeCount() +
+              ", Max: " +
+              workerEntryList.get(workerEntryList.size() - 1).getKey() +
+              " - " +
+              workerEntryList.get(workerEntryList.size() - 1).
+              getValue().getEdgeCount());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
new file mode 100644
index 0000000..f9b0329
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Some functionality is provided, but this is meant for developers to
+ * determine the partitioning based on the actual types of data.  The
+ * implementation of several methods are left to the developer who is trying
+ * to control the amount of messages sent from one worker to another.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangeMasterPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    MasterGraphPartitioner<I, V, E, M> {
+  @Override
+  public PartitionStats createPartitionStats() {
+    return new RangePartitionStats<I>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
new file mode 100644
index 0000000..1ecedb8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Added the max key index in to the {@link PartitionOwner}.  Also can provide
+ * a split hint if desired.
+ *
+ * @param <I> Vertex index type
+ */
+@SuppressWarnings("rawtypes")
+public class RangePartitionOwner<I extends WritableComparable>
+    extends BasicPartitionOwner {
+  /** Max index for this partition */
+  private I maxIndex;
+
+  /**
+   * Default constructor.
+   */
+  public RangePartitionOwner() { }
+
+  /**
+   * Constructor with the max index.
+   *
+   * @param maxIndex Max index of this partition.
+   */
+  public RangePartitionOwner(I maxIndex) {
+    this.maxIndex = maxIndex;
+  }
+
+  /**
+   * Get the maximum index of this partition owner.
+   *
+   * @return Maximum index.
+   */
+  public I getMaxIndex() {
+    return maxIndex;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    super.readFields(input);
+    maxIndex = (I) getConf().createVertexId();
+    maxIndex.readFields(input);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    super.write(output);
+    maxIndex.write(output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java
new file mode 100644
index 0000000..73af816
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Same as {@link PartitionStats}, but also includes the hint for range-based
+ * partitioning.
+ *
+ * @param <I> Vertex index type
+ */
+@SuppressWarnings("rawtypes")
+public class RangePartitionStats<I extends WritableComparable>
+    extends PartitionStats {
+  /** Can be null if no hint, otherwise a splitting hint */
+  private RangeSplitHint<I> hint;
+
+  /**
+   * Get the range split hint (if any)
+   *
+   * @return Hint of how to split the range if desired, null otherwise
+   */
+  public RangeSplitHint<I> getRangeSplitHint() {
+    return hint;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    super.readFields(input);
+    boolean hintExists = input.readBoolean();
+    if (hintExists) {
+      hint = new RangeSplitHint<I>();
+      hint.readFields(input);
+    } else {
+      hint = null;
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    super.write(output);
+    output.writeBoolean(hint != null);
+    if (hint != null) {
+      hint.write(output);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
new file mode 100644
index 0000000..29f7898
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Range partitioning will split the vertices by a key range based on a generic
+ * type.  This allows vertices that have some locality with the vertex ids
+ * to reduce the amount of messages sent.  The tradeoffs are that
+ * range partitioning is more susceptible to hot spots if the keys
+ * are not randomly distributed.  Another negative is the user must implement
+ * some of the functionality around how to split the key range.
+ *
+ * See {@link RangeWorkerPartitioner}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangePartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements GraphPartitionerFactory<I, V, E, M> {
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
new file mode 100644
index 0000000..9634c33
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Hint to the {@link RangeMasterPartitioner} about how a
+ * {@link RangePartitionOwner} can be split.
+ *
+ * @param <I> Vertex index to split around
+ */
+@SuppressWarnings("rawtypes")
+public class RangeSplitHint<I extends WritableComparable>
+    implements Writable, ImmutableClassesGiraphConfigurable {
+  /** Hinted split index */
+  private I splitIndex;
+  /** Number of vertices in this range before the split */
+  private long preSplitVertexCount;
+  /** Number of vertices in this range after the split */
+  private long postSplitVertexCount;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, ?, ?, ?> conf;
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    splitIndex = conf.createVertexId();
+    splitIndex.readFields(input);
+    preSplitVertexCount = input.readLong();
+    postSplitVertexCount = input.readLong();
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    splitIndex.write(output);
+    output.writeLong(preSplitVertexCount);
+    output.writeLong(postSplitVertexCount);
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
new file mode 100644
index 0000000..5a494a5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Range partitioning will split the vertices by a key range based on a generic
+ * type.  This allows vertices that have some locality with the vertex ids
+ * to reduce the amount of messages sent.  The tradeoffs are that
+ * range partitioning is more susceptible to hot spots if the keys
+ * are not randomly distributed.  Another negative is the user must implement
+ * some of the functionality around how to split the key range.
+ *
+ * Note:  This implementation is incomplete, the developer must implement the
+ * various methods based on their index type.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangeWorkerPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    WorkerGraphPartitioner<I, V, E, M> {
+  /** Mapping of the vertex ids to the {@link PartitionOwner} */
+  protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
+      new TreeMap<I, RangePartitionOwner<I>>();
+
+  @Override
+  public PartitionOwner createPartitionOwner() {
+    return new RangePartitionOwner<I>();
+  }
+
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    // Find the partition owner based on the maximum partition id.
+    // If the vertex id exceeds any of the maximum partition ids, give
+    // it to the last one
+    if (vertexId == null) {
+      throw new IllegalArgumentException(
+          "getPartitionOwner: Illegal null vertex id");
+    }
+    I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
+    if (maxVertexIndex == null) {
+      return vertexRangeMap.lastEntry().getValue();
+    } else {
+      return vertexRangeMap.get(vertexId);
+    }
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> getPartitionOwners() {
+    return vertexRangeMap.values();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
new file mode 100644
index 0000000..479011f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.collect.Maps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * A simple map-based container that stores vertices.  Vertex ids will map to
+ * exactly one partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class SimplePartition<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Partition<I, V, E, M> {
+  /** Configuration from the worker */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Partition id */
+  private int id;
+  /** Vertex map for this range (keyed by index) */
+  private ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
+  /** Context used to report progress */
+  private Progressable progressable;
+
+  /**
+   * Constructor for reflection.
+   */
+  public SimplePartition() { }
+
+  @Override
+  public void initialize(int partitionId, Progressable progressable) {
+    setId(partitionId);
+    setProgressable(progressable);
+    if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
+        GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
+    } else {
+      vertexMap = Maps.newConcurrentMap();
+    }
+  }
+
+  @Override
+  public Vertex<I, V, E, M> getVertex(I vertexIndex) {
+    return vertexMap.get(vertexIndex);
+  }
+
+  @Override
+  public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
+    return vertexMap.put(vertex.getId(), vertex);
+  }
+
+  @Override
+  public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
+    return vertexMap.remove(vertexIndex);
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    for (Vertex<I, V, E , M> vertex : partition) {
+      vertexMap.put(vertex.getId(), vertex);
+    }
+  }
+
+  @Override
+  public long getVertexCount() {
+    return vertexMap.size();
+  }
+
+  @Override
+  public long getEdgeCount() {
+    long edges = 0;
+    for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
+      edges += vertex.getNumEdges();
+    }
+    return edges;
+  }
+
+  @Override
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
+  @Override
+  public void setProgressable(Progressable progressable) {
+    this.progressable = progressable;
+  }
+
+  @Override
+  public void saveVertex(Vertex<I, V, E, M> vertex) {
+    // No-op, vertices are stored as Java objects in this partition
+  }
+
+  @Override
+  public String toString() {
+    return "(id=" + getId() + ",V=" + vertexMap.size() + ")";
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
+        GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
+    } else {
+      vertexMap = Maps.newConcurrentMap();
+    }
+    id = input.readInt();
+    int vertices = input.readInt();
+    for (int i = 0; i < vertices; ++i) {
+      Vertex<I, V, E, M> vertex = conf.createVertex();
+      if (progressable != null) {
+        progressable.progress();
+      }
+      vertex.readFields(input);
+      if (vertexMap.put(vertex.getId(), vertex) != null) {
+        throw new IllegalStateException(
+            "readFields: " + this +
+            " already has same id " + vertex);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(id);
+    output.writeInt(vertexMap.size());
+    for (Vertex vertex : vertexMap.values()) {
+      if (progressable != null) {
+        progressable.progress();
+      }
+      vertex.write(output);
+    }
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+    this.conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return conf;
+  }
+
+  @Override
+  public Iterator<Vertex<I, V, E, M>> iterator() {
+    return vertexMap.values().iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
new file mode 100644
index 0000000..7bd0bb1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.google.common.collect.Maps;
+
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A simple in-memory partition store.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class SimplePartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends PartitionStore<I, V, E, M> {
+  /** Map of stored partitions. */
+  private final ConcurrentMap<Integer, Partition<I, V, E, M>> partitions =
+      Maps.newConcurrentMap();
+  /** Configuration. */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Context used to report progress */
+  private final Mapper<?, ?, ?, ?>.Context context;
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   * @param context Mapper context
+   */
+  public SimplePartitionStore(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+      Mapper<?, ?, ?, ?>.Context context) {
+    this.conf = conf;
+    this.context = context;
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    Partition<I, V, E, M> oldPartition = partitions.get(partition.getId());
+    if (oldPartition == null) {
+      oldPartition = partitions.putIfAbsent(partition.getId(), partition);
+      if (oldPartition == null) {
+        return;
+      }
+    }
+    oldPartition.addPartition(partition);
+  }
+
+  @Override
+  public Partition<I, V, E, M> getPartition(Integer partitionId) {
+    return partitions.get(partitionId);
+  }
+
+  @Override
+  public Partition<I, V, E, M> removePartition(Integer partitionId) {
+    return partitions.remove(partitionId);
+  }
+
+  @Override
+  public void deletePartition(Integer partitionId) {
+    partitions.remove(partitionId);
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return partitions.containsKey(partitionId);
+  }
+
+  @Override
+  public Iterable<Integer> getPartitionIds() {
+    return partitions.keySet();
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return partitions.size();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
new file mode 100644
index 0000000..5a78b1d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Collection;
+
+/**
+ * Stores the {@link PartitionOwner} objects from the master and provides the
+ * mapping of vertex to {@link PartitionOwner}. Also generates the partition
+ * owner implementation.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface WorkerGraphPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Instantiate the {@link PartitionOwner} implementation used to read the
+   * master assignments.
+   *
+   * @return Instantiated {@link PartitionOwner} object
+   */
+  PartitionOwner createPartitionOwner();
+
+  /**
+   * Figure out the owner of a vertex
+   *
+   * @param vertexId Vertex id to get the partition for
+   * @return Correct partition owner
+   */
+  PartitionOwner getPartitionOwner(I vertexId);
+
+  /**
+   * At the end of a superstep, workers have {@link PartitionStats} generated
+   * for each of their partitions.  This method will allow the user to
+   * modify or create their own {@link PartitionStats} interfaces to send to
+   * the master.
+   *
+   * @param workerPartitionStats Stats generated by the infrastructure during
+   *        the superstep
+   * @param partitionStore Partition store for this worker
+   *        (could be used to provide more useful stat information)
+   * @return Final partition stats
+   */
+  Collection<PartitionStats> finalizePartitionStats(
+      Collection<PartitionStats> workerPartitionStats,
+      PartitionStore<I, V, E, M> partitionStore);
+
+  /**
+   * Get the partitions owners and update locally.  Returns the partitions
+   * to send to other workers and other dependencies.
+   *
+   * @param myWorkerInfo Worker info.
+   * @param masterSetPartitionOwners Master set partition owners, received
+   *        prior to beginning the superstep
+   * @param partitionStore Partition store for this worker
+   *        (can be used to fill the return map of partitions to send)
+   * @return Information for the partition exchange.
+   */
+  PartitionExchange updatePartitionOwners(
+      WorkerInfo myWorkerInfo,
+      Collection<? extends PartitionOwner> masterSetPartitionOwners,
+      PartitionStore<I, V, E, M> partitionStore);
+
+  /**
+   * Get a collection of the {@link PartitionOwner} objects.
+   *
+   * @return Collection of owners for every partition.
+   */
+  Collection<? extends PartitionOwner> getPartitionOwners();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/package-info.java b/giraph-core/src/main/java/org/apache/giraph/partition/package-info.java
new file mode 100644
index 0000000..b0628d6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of partitioning related objects.
+ */
+package org.apache.giraph.partition;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/time/FakeTime.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/time/FakeTime.java b/giraph-core/src/main/java/org/apache/giraph/time/FakeTime.java
new file mode 100644
index 0000000..f02143f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/time/FakeTime.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.time;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread-safe implementation of Time for testing that can help get time based
+ * ordering of events when desired.
+ */
+public class FakeTime implements Time {
+  /** Nanoseconds from the fake epoch */
+  private final AtomicLong nanosecondsSinceEpoch = new AtomicLong();
+
+  @Override
+  public long getMilliseconds() {
+    return nanosecondsSinceEpoch.get() / NS_PER_MS;
+  }
+
+  @Override
+  public long getMicroseconds() {
+    return nanosecondsSinceEpoch.get() / NS_PER_US;
+  }
+
+  @Override
+  public long getNanoseconds() {
+    return nanosecondsSinceEpoch.get();
+  }
+
+  @Override
+  public int getSeconds() {
+    return (int) (nanosecondsSinceEpoch.get() / NS_PER_SECOND);
+  }
+
+  @Override
+  public Date getCurrentDate() {
+    return new Date(getMilliseconds());
+  }
+
+  @Override
+  public void sleep(long milliseconds) throws InterruptedException {
+    nanosecondsSinceEpoch.getAndAdd(milliseconds * NS_PER_MS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/time/SystemTime.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/time/SystemTime.java b/giraph-core/src/main/java/org/apache/giraph/time/SystemTime.java
new file mode 100644
index 0000000..f232f51
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/time/SystemTime.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.time;
+
+import java.util.Date;
+
+/**
+ * Implementation of Time that is thread-safe and should be used in
+ * production.
+ */
+public class SystemTime implements Time {
+  /**
+   * Single instance of this object
+   */
+  private static final SystemTime SINGLE_TIME = new SystemTime();
+
+  @Override
+  public long getMilliseconds() {
+    return System.currentTimeMillis();
+  }
+
+  @Override
+  public long getMicroseconds() {
+    return getNanoseconds() / NS_PER_US;
+  }
+
+  @Override
+  public long getNanoseconds() {
+    return System.nanoTime();
+  }
+
+  @Override
+  public int getSeconds() {
+    return (int) (getMilliseconds() / MS_PER_SECOND);
+  }
+
+  @Override
+  public Date getCurrentDate() {
+    return new Date();
+  }
+
+  @Override
+  public void sleep(long milliseconds) throws InterruptedException {
+    Thread.sleep(milliseconds);
+  }
+
+  /**
+   * Get an instance (shared) of this object
+   *
+   * @return Instance of this object
+   */
+  public static Time get() {
+    return SINGLE_TIME;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/time/Time.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/time/Time.java b/giraph-core/src/main/java/org/apache/giraph/time/Time.java
new file mode 100644
index 0000000..b8216bb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/time/Time.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.time;
+
+import java.util.Date;
+
+/**
+ * Interface for handling Time related operations so that they can be mocked
+ * for testing.
+ */
+public interface Time {
+  /** Microseconds per millisecond */
+  long US_PER_MS = 1000;
+  /** Nanoseconds per microsecond */
+  long NS_PER_US = 1000;
+  /** Nanoseconds per millisecond */
+  long NS_PER_MS = US_PER_MS * NS_PER_US;
+  /** Milliseconds per second */
+  long MS_PER_SECOND = 1000;
+  /** Milliseconds per second (as float) */
+  float MS_PER_SECOND_AS_FLOAT = MS_PER_SECOND * 1f;
+  /** Microseconds per second */
+  long US_PER_SECOND = US_PER_MS * MS_PER_SECOND;
+  /** Microseconds per second (as float) */
+  float US_PER_SECOND_AS_FLOAT = US_PER_SECOND * 1f;
+  /** Nanoseconds per second */
+  long NS_PER_SECOND = NS_PER_US * US_PER_SECOND;
+  /** Nanoseconds per second (as float) */
+  float NS_PER_SECOND_AS_FLOAT = NS_PER_SECOND * 1f;
+  /** Seconds per hour */
+  long SECONDS_PER_HOUR = 60 * 60;
+  /** Seconds per day */
+  long SECONDS_PER_DAY = 24 * SECONDS_PER_HOUR;
+  /** Milliseconds per hour */
+  long MS_PER_HOUR = SECONDS_PER_HOUR * MS_PER_SECOND;
+  /** Milliseconds per day */
+  long MS_PER_DAY = SECONDS_PER_DAY * MS_PER_SECOND;
+
+  /**
+   * Get the current milliseconds
+   *
+   * @return The difference, measured in milliseconds, between
+   *         the current time and midnight, January 1, 1970 UTC.
+   */
+  long getMilliseconds();
+
+  /**
+   * Get the current microseconds
+   *
+   * @return The difference, measured in microseconds, between
+   *         the current time and midnight, January 1, 1970 UTC.
+   */
+  long getMicroseconds();
+
+  /**
+   * Get the current nanoseconds
+   *
+   * @return The difference, measured in nanoseconds, between
+   *         the current time and midnight, January 1, 1970 UTC.
+   */
+  long getNanoseconds();
+
+  /**
+   * Get the current seconds
+   *
+   * @return The difference, measured in seconds, between
+   *         the current time and midnight, January 1, 1970 UTC.
+   */
+  int getSeconds();
+
+  /**
+   * Get the current date
+   *
+   * @return Current date
+   */
+  Date getCurrentDate();
+
+  /**
+   * Current thread should sleep for some number of milliseconds.
+   *
+   * @param milliseconds Milliseconds to sleep for
+   * @throws InterruptedException
+   */
+  void sleep(long milliseconds) throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/time/Times.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/time/Times.java b/giraph-core/src/main/java/org/apache/giraph/time/Times.java
new file mode 100644
index 0000000..5752d18
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/time/Times.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.time;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility methods for Time classes.
+ */
+public class Times {
+  /** Do not instantiate */
+  private Times() { }
+
+  /**
+   * Convenience method to measure time in a given TimeUnit.
+   *
+   * @param time Time instance to use
+   * @param timeUnit TimeUnit to measure in
+   * @return long measured time in TimeUnit dimension
+   */
+  public static long get(Time time, TimeUnit timeUnit) {
+    return timeUnit.convert(time.getNanoseconds(), TimeUnit.NANOSECONDS);
+  }
+
+  /**
+   * Convenience method to get time since the beginning of an event in a given
+   * TimeUnit.
+   *
+   * @param time Time object used for measuring.
+   * @param timeUnit TimeUnit to use for dimension.
+   * @param startTime beginning time to diff against
+   * @return time elapsed since startTime in TimeUnit dimension.
+   */
+  public static long getDiff(Time time, TimeUnit timeUnit, long startTime) {
+    return get(time, timeUnit) - startTime;
+  }
+
+  /**
+   * Convenience method to get milliseconds since a previous milliseconds
+   * point.
+   *
+   * @param time Time instance to use
+   * @param previousMilliseconds Previous milliseconds
+   * @return Milliseconds elapsed since the previous milliseconds
+   */
+  public static long getMillisecondsSince(Time time,
+                                          long previousMilliseconds) {
+    return time.getMilliseconds() - previousMilliseconds;
+  }
+
+  /**
+   * Convenience method to get milliseconds since a previous milliseconds
+   * point.
+   *
+   * @param time Time instance to use
+   * @param previousMs Previous milliseconds
+   * @return Milliseconds elapsed since the previous milliseconds
+   */
+  public static long getMsSince(Time time, long previousMs) {
+    return getMillisecondsSince(time, previousMs);
+  }
+
+  /**
+   * Convenience method to get microseconds since a previous microseconds point.
+   *
+   * @param time Time instance to use
+   * @param previousMicros Previous microseconds
+   * @return Microseconds elapsed since the previous microseconds
+   */
+  public static long getMicrosSince(Time time, long previousMicros) {
+    return time.getMicroseconds() - previousMicros;
+  }
+
+  /**
+   * Convenience method to get nanoseconds since a previous nanoseconds
+   * point.
+   *
+   * @param time Time instance to use
+   * @param previousNanoseconds Previous nanoseconds
+   * @return Nanoseconds elapsed since the previous nanoseconds
+   */
+  public static long getNanosecondsSince(Time time, long previousNanoseconds) {
+    return time.getNanoseconds() - previousNanoseconds;
+  }
+
+  /**
+   * Convenience method to get nanoseconds since a previous nanoseconds
+   * point.
+   *
+   * @param time Time instance to use
+   * @param previousNanos Previous nanoseconds
+   * @return Nanoseconds elapsed since the previous nanoseconds
+   */
+  public static long getNanosSince(Time time, long previousNanos) {
+    return getNanosecondsSince(time, previousNanos);
+  }
+
+  /**
+   * Convenience method to get seconds since a previous seconds
+   * point.
+   *
+   * @param time Time instance to use
+   * @param previousSeconds Previous seconds
+   * @return Seconds elapsed since the previous seconds
+   */
+  public static int getSecondsSince(Time time, int previousSeconds) {
+    return time.getSeconds() - previousSeconds;
+  }
+
+  /**
+   * Convenience method to get seconds since a previous seconds
+   * point.
+   *
+   * @param time Time instance to use
+   * @param previousSec Previous seconds
+   * @return Seconds elapsed since the previous seconds
+   */
+  public static int getSecSince(Time time, int previousSec) {
+    return getSecondsSince(time, previousSec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/time/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/time/package-info.java b/giraph-core/src/main/java/org/apache/giraph/time/package-info.java
new file mode 100644
index 0000000..b185603
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/time/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Time related things.
+ */
+package org.apache.giraph.time;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/utils/FakeTime.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/FakeTime.java b/giraph-core/src/main/java/org/apache/giraph/utils/FakeTime.java
deleted file mode 100644
index f20c10b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/FakeTime.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Thread-safe implementation of Time for testing that can help get time based
- * ordering of events when desired.
- */
-public class FakeTime implements Time {
-  /** Nanoseconds from the fake epoch */
-  private final AtomicLong nanosecondsSinceEpoch = new AtomicLong();
-
-  @Override
-  public long getMilliseconds() {
-    return nanosecondsSinceEpoch.get() / NS_PER_MS;
-  }
-
-  @Override
-  public long getMicroseconds() {
-    return nanosecondsSinceEpoch.get() / NS_PER_US;
-  }
-
-  @Override
-  public long getNanoseconds() {
-    return nanosecondsSinceEpoch.get();
-  }
-
-  @Override
-  public int getSeconds() {
-    return (int) (nanosecondsSinceEpoch.get() / NS_PER_SECOND);
-  }
-
-  @Override
-  public Date getCurrentDate() {
-    return new Date(getMilliseconds());
-  }
-
-  @Override
-  public void sleep(long milliseconds) throws InterruptedException {
-    nanosecondsSinceEpoch.getAndAdd(milliseconds * NS_PER_MS);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index bccb827..6c7f34d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -22,7 +22,7 @@ import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.io.GiraphFileInputFormat;
+import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/utils/SystemTime.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/SystemTime.java b/giraph-core/src/main/java/org/apache/giraph/utils/SystemTime.java
deleted file mode 100644
index 16d401c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/SystemTime.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import java.util.Date;
-
-/**
- * Implementation of Time that is thread-safe and should be used in
- * production.
- */
-public class SystemTime implements Time {
-  /**
-   * Single instance of this object
-   */
-  private static final SystemTime SINGLE_TIME = new SystemTime();
-
-  @Override
-  public long getMilliseconds() {
-    return System.currentTimeMillis();
-  }
-
-  @Override
-  public long getMicroseconds() {
-    return getNanoseconds() / NS_PER_US;
-  }
-
-  @Override
-  public long getNanoseconds() {
-    return System.nanoTime();
-  }
-
-  @Override
-  public int getSeconds() {
-    return (int) (getMilliseconds() / MS_PER_SECOND);
-  }
-
-  @Override
-  public Date getCurrentDate() {
-    return new Date();
-  }
-
-  @Override
-  public void sleep(long milliseconds) throws InterruptedException {
-    Thread.sleep(milliseconds);
-  }
-
-  /**
-   * Get an instance (shared) of this object
-   *
-   * @return Instance of this object
-   */
-  public static Time get() {
-    return SINGLE_TIME;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/utils/Time.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/Time.java b/giraph-core/src/main/java/org/apache/giraph/utils/Time.java
deleted file mode 100644
index 0c2dd64..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/Time.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import java.util.Date;
-
-/**
- * Interface for handling Time related operations so that they can be mocked
- * for testing.
- */
-public interface Time {
-  /** Microseconds per millisecond */
-  long US_PER_MS = 1000;
-  /** Nanoseconds per microsecond */
-  long NS_PER_US = 1000;
-  /** Nanoseconds per millisecond */
-  long NS_PER_MS = US_PER_MS * NS_PER_US;
-  /** Milliseconds per second */
-  long MS_PER_SECOND = 1000;
-  /** Milliseconds per second (as float) */
-  float MS_PER_SECOND_AS_FLOAT = MS_PER_SECOND * 1f;
-  /** Microseconds per second */
-  long US_PER_SECOND = US_PER_MS * MS_PER_SECOND;
-  /** Microseconds per second (as float) */
-  float US_PER_SECOND_AS_FLOAT = US_PER_SECOND * 1f;
-  /** Nanoseconds per second */
-  long NS_PER_SECOND = NS_PER_US * US_PER_SECOND;
-  /** Nanoseconds per second (as float) */
-  float NS_PER_SECOND_AS_FLOAT = NS_PER_SECOND * 1f;
-  /** Seconds per hour */
-  long SECONDS_PER_HOUR = 60 * 60;
-  /** Seconds per day */
-  long SECONDS_PER_DAY = 24 * SECONDS_PER_HOUR;
-  /** Milliseconds per hour */
-  long MS_PER_HOUR = SECONDS_PER_HOUR * MS_PER_SECOND;
-  /** Milliseconds per day */
-  long MS_PER_DAY = SECONDS_PER_DAY * MS_PER_SECOND;
-
-  /**
-   * Get the current milliseconds
-   *
-   * @return The difference, measured in milliseconds, between
-   *         the current time and midnight, January 1, 1970 UTC.
-   */
-  long getMilliseconds();
-
-  /**
-   * Get the current microseconds
-   *
-   * @return The difference, measured in microseconds, between
-   *         the current time and midnight, January 1, 1970 UTC.
-   */
-  long getMicroseconds();
-
-  /**
-   * Get the current nanoseconds
-   *
-   * @return The difference, measured in nanoseconds, between
-   *         the current time and midnight, January 1, 1970 UTC.
-   */
-  long getNanoseconds();
-
-  /**
-   * Get the current seconds
-   *
-   * @return The difference, measured in seconds, between
-   *         the current time and midnight, January 1, 1970 UTC.
-   */
-  int getSeconds();
-
-  /**
-   * Get the current date
-   *
-   * @return Current date
-   */
-  Date getCurrentDate();
-
-  /**
-   * Current thread should sleep for some number of milliseconds.
-   *
-   * @param milliseconds Milliseconds to sleep for
-   * @throws InterruptedException
-   */
-  void sleep(long milliseconds) throws InterruptedException;
-}


Mime
View raw message