giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [19/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:31 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
new file mode 100644
index 0000000..5460a55
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph.partition;
+
+import com.google.common.collect.MapMaker;
+
+import com.google.common.primitives.Ints;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+/**
+ * Byte array based partition.  Should reduce the amount of memory used since
+ * the entire graph is compressed into byte arrays.  Must guarantee, however,
+ * that only one thread at a time will call getVertex since it is a singleton.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class ByteArrayPartition<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Partition<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class);
+  /** Configuration from the worker */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Partition id */
+  private int id;
+  /**
+   * Vertex map for this range (keyed by index).  Note that the byte[] is a
+   * serialized vertex with the first four bytes as the length of the vertex
+   * to read.
+   */
+  private ConcurrentMap<I, byte[]> vertexMap;
+  /** Context used to report progress */
+  private Progressable progressable;
+  /** Representative vertex */
+  private Vertex<I, V, E, M> representativeVertex;
+  /** Use unsafe serialization */
+  private boolean useUnsafeSerialization;
+
+  /**
+   * Constructor for reflection.
+   */
+  public ByteArrayPartition() { }
+
+  @Override
+  public void initialize(int partitionId, Progressable progressable) {
+    setId(partitionId);
+    setProgressable(progressable);
+    vertexMap = new MapMaker().concurrencyLevel(
+        conf.getNettyServerExecutionConcurrency()).makeMap();
+    representativeVertex = conf.createVertex();
+    useUnsafeSerialization = conf.useUnsafeSerialization();
+  }
+
+  @Override
+  public Vertex<I, V, E, M> getVertex(I vertexIndex) {
+    byte[] vertexData = vertexMap.get(vertexIndex);
+    if (vertexData == null) {
+      return null;
+    }
+    WritableUtils.readFieldsFromByteArrayWithSize(
+        vertexData, representativeVertex, useUnsafeSerialization);
+    return representativeVertex;
+  }
+
+  @Override
+  public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
+    byte[] vertexData =
+        WritableUtils.writeToByteArrayWithSize(vertex, useUnsafeSerialization);
+    byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
+    if (oldVertexBytes == null) {
+      return null;
+    } else {
+      WritableUtils.readFieldsFromByteArrayWithSize(
+          oldVertexBytes, representativeVertex, useUnsafeSerialization);
+      return representativeVertex;
+    }
+  }
+
+  @Override
+  public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
+    byte[] vertexBytes = vertexMap.remove(vertexIndex);
+    if (vertexBytes == null) {
+      return null;
+    }
+    WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
+        representativeVertex, useUnsafeSerialization);
+    return representativeVertex;
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    // Only work with other ByteArrayPartition instances
+    if (!(partition instanceof ByteArrayPartition)) {
+      throw new IllegalStateException("addPartition: Cannot add partition " +
+          "of type " + partition.getClass());
+    }
+
+    ByteArrayPartition<I, V, E, M> byteArrayPartition =
+        (ByteArrayPartition<I, V, E, M>) partition;
+    for (Map.Entry<I, byte[]> entry :
+        byteArrayPartition.vertexMap.entrySet()) {
+      vertexMap.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public long getVertexCount() {
+    return vertexMap.size();
+  }
+
+  @Override
+  public long getEdgeCount() {
+    long edges = 0;
+    for (byte[] vertexBytes : vertexMap.values()) {
+      WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
+          representativeVertex, useUnsafeSerialization);
+      edges += representativeVertex.getNumEdges();
+    }
+    return edges;
+  }
+
+  @Override
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
+  @Override
+  public void setProgressable(Progressable progressable) {
+    this.progressable = progressable;
+  }
+
+  @Override
+  public void saveVertex(Vertex<I, V, E, M> vertex) {
+    // Reuse the old buffer whenever possible
+    byte[] oldVertexData = vertexMap.get(vertex.getId());
+    if (oldVertexData != null) {
+      vertexMap.put(vertex.getId(),
+          WritableUtils.writeToByteArrayWithSize(
+              vertex, oldVertexData, useUnsafeSerialization));
+    } else {
+      vertexMap.put(vertex.getId(),
+          WritableUtils.writeToByteArrayWithSize(
+              vertex, useUnsafeSerialization));
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(id);
+    output.writeInt(vertexMap.size());
+    for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
+      if (progressable != null) {
+        progressable.progress();
+      }
+      entry.getKey().write(output);
+      // Note here that we are writing the size of the vertex data first
+      // as it is encoded in the first four bytes of the byte[]
+      int vertexDataSize;
+      if (useUnsafeSerialization) {
+        vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(),
+            0);
+      } else {
+        vertexDataSize = Ints.fromByteArray(entry.getValue());
+      }
+
+      output.writeInt(vertexDataSize);
+      output.write(entry.getValue(), 0, vertexDataSize);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    id = input.readInt();
+    int size = input.readInt();
+    vertexMap = new MapMaker().concurrencyLevel(
+        conf.getNettyServerExecutionConcurrency()).initialCapacity(
+        size).makeMap();
+    representativeVertex = conf.createVertex();
+    useUnsafeSerialization = conf.useUnsafeSerialization();
+    for (int i = 0; i < size; ++i) {
+      if (progressable != null) {
+        progressable.progress();
+      }
+      I vertexId = conf.createVertexId();
+      vertexId.readFields(input);
+      int vertexDataSize = input.readInt();
+      byte[] vertexData = new byte[vertexDataSize];
+      input.readFully(vertexData);
+      if (vertexMap.put(vertexId, vertexData) != null) {
+        throw new IllegalStateException("readFields: Already saw vertex " +
+            vertexId);
+      }
+    }
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+    conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return conf;
+  }
+
+  @Override
+  public Iterator<Vertex<I, V, E, M>> iterator() {
+    return new RepresentativeVertexIterator();
+  }
+
+  /**
+   * Iterator that deserializes a vertex from a byte array on the fly, using
+   * the same representative vertex object.
+   */
+  private class RepresentativeVertexIterator implements
+      Iterator<Vertex<I, V, E, M>> {
+    /** Iterator to the vertex values */
+    private Iterator<byte[]> vertexDataIterator =
+        vertexMap.values().iterator();
+
+    @Override
+    public boolean hasNext() {
+      return vertexDataIterator.hasNext();
+    }
+
+    @Override
+    public Vertex<I, V, E, M> next() {
+      WritableUtils.readFieldsFromByteArrayWithSize(
+          vertexDataIterator.next(), representativeVertex,
+          useUnsafeSerialization);
+      return representativeVertex;
+    }
+
+    @Override
+    public void remove() {
+      throw new IllegalAccessError("remove: This method is not supported.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
new file mode 100644
index 0000000..1e483d3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A partition store that can possibly spill to disk.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class DiskBackedPartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends PartitionStore<I, V, E, M> {
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(DiskBackedPartitionStore.class);
+  /** Map of partitions kept in memory. */
+  private final ConcurrentMap<Integer, Partition<I, V, E, M>>
+  inMemoryPartitions = new ConcurrentHashMap<Integer, Partition<I, V, E, M>>();
+  /** Maximum number of partitions to keep in memory. */
+  private int maxInMemoryPartitions;
+  /** Map of partitions kept out-of-core. The values are partition sizes. */
+  private final ConcurrentMap<Integer, Integer> onDiskPartitions =
+      Maps.newConcurrentMap();
+  /** Directory on the local file system for storing out-of-core partitions. */
+  private final String basePath;
+  /** Configuration. */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Slot for loading out-of-core partitions. */
+  private Partition<I, V, E, M> loadedPartition;
+  /** Locks for accessing and modifying partitions. */
+  private final ConcurrentMap<Integer, Lock> partitionLocks =
+      Maps.newConcurrentMap();
+  /** Context used to report progress */
+  private final Mapper<?, ?, ?, ?>.Context context;
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   * @param context Mapper context
+   */
+  public DiskBackedPartitionStore(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+      Mapper<?, ?, ?, ?>.Context context) {
+    this.conf = conf;
+    this.context = context;
+    // We must be able to hold at least one partition in memory
+    maxInMemoryPartitions = Math.max(1,
+        conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY,
+            GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
+    basePath = conf.get("mapred.job.id", "Unknown Job") +
+        conf.get(GiraphConstants.PARTITIONS_DIRECTORY,
+            GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
+  }
+
+  /**
+   * Get the path to the file where a partition is stored.
+   *
+   * @param partitionId The partition
+   * @return The path to the given partition
+   */
+  private String getPartitionPath(Integer partitionId) {
+    return basePath + "/partition-" + partitionId;
+  }
+
+  /**
+   * Create a new lock for a partition, lock it, and return it. If already
+   * existing, return null.
+   *
+   * @param partitionId Partition id
+   * @return A newly created lock, or null if already present
+   */
+  private Lock createLock(Integer partitionId) {
+    Lock lock = new ReentrantLock(true);
+    lock.lock();
+    if (partitionLocks.putIfAbsent(partitionId, lock) != null) {
+      return null;
+    }
+    return lock;
+  }
+
+  /**
+   * Get the lock for a partition id.
+   *
+   * @param partitionId Partition id
+   * @return The lock
+   */
+  private Lock getLock(Integer partitionId) {
+    return partitionLocks.get(partitionId);
+  }
+
+  /**
+   * Write a partition to disk.
+   *
+   * @param partition The partition object to write
+   * @throws java.io.IOException
+   */
+  private void writePartition(Partition<I, V, E, M> partition)
+    throws IOException {
+    File file = new File(getPartitionPath(partition.getId()));
+    file.getParentFile().mkdirs();
+    file.createNewFile();
+    DataOutputStream outputStream = new DataOutputStream(
+        new BufferedOutputStream(new FileOutputStream(file)));
+    for (Vertex<I, V, E, M> vertex : partition) {
+      vertex.write(outputStream);
+    }
+    outputStream.close();
+  }
+
+  /**
+   * Read a partition from disk.
+   *
+   * @param partitionId Id of the partition to read
+   * @return The partition object
+   * @throws IOException
+   */
+  private Partition<I, V, E, M> readPartition(Integer partitionId)
+    throws IOException {
+    Partition<I, V, E, M> partition =
+        conf.createPartition(partitionId, context);
+    File file = new File(getPartitionPath(partitionId));
+    DataInputStream inputStream = new DataInputStream(
+        new BufferedInputStream(new FileInputStream(file)));
+    int numVertices = onDiskPartitions.get(partitionId);
+    for (int i = 0; i < numVertices; ++i) {
+      Vertex<I, V, E, M> vertex = conf.createVertex();
+      vertex.readFields(inputStream);
+      partition.putVertex(vertex);
+    }
+    inputStream.close();
+    file.delete();
+    return partition;
+  }
+
+  /**
+   * Append some vertices of another partition to an out-of-core partition.
+   *
+   * @param partition Partition to add
+   * @throws IOException
+   */
+  private void appendPartitionOutOfCore(Partition<I, V, E, M> partition)
+    throws IOException {
+    File file = new File(getPartitionPath(partition.getId()));
+    DataOutputStream outputStream = new DataOutputStream(
+        new BufferedOutputStream(new FileOutputStream(file, true)));
+    for (Vertex<I, V, E, M> vertex : partition) {
+      vertex.write(outputStream);
+    }
+    outputStream.close();
+  }
+
+  /**
+   * Load an out-of-core partition in memory.
+   *
+   * @param partitionId Partition id
+   */
+  private void loadPartition(Integer partitionId) {
+    if (loadedPartition != null) {
+      if (loadedPartition.getId() == partitionId) {
+        return;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("loadPartition: moving partition " + loadedPartition.getId() +
+            " out of core with size " + loadedPartition.getVertexCount());
+      }
+      try {
+        writePartition(loadedPartition);
+        onDiskPartitions.put(loadedPartition.getId(),
+            (int) loadedPartition.getVertexCount());
+        loadedPartition = null;
+      } catch (IOException e) {
+        throw new IllegalStateException("loadPartition: failed writing " +
+            "partition " + loadedPartition.getId() + " to disk", e);
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadPartition: loading partition " + partitionId +
+          " in memory");
+    }
+    try {
+      loadedPartition = readPartition(partitionId);
+    } catch (IOException e) {
+      throw new IllegalStateException("loadPartition: failed reading " +
+          "partition " + partitionId + " from disk");
+    }
+  }
+
+  /**
+   * Add a new partition without requiring a lock.
+   *
+   * @param partition Partition to be added
+   */
+  private void addPartitionNoLock(Partition<I, V, E, M> partition) {
+    synchronized (inMemoryPartitions) {
+      if (inMemoryPartitions.size() + 1 < maxInMemoryPartitions) {
+        inMemoryPartitions.put(partition.getId(), partition);
+
+        return;
+      }
+    }
+    try {
+      writePartition(partition);
+      onDiskPartitions.put(partition.getId(),
+          (int) partition.getVertexCount());
+    } catch (IOException e) {
+      throw new IllegalStateException("addPartition: failed writing " +
+          "partition " + partition.getId() + "to disk");
+    }
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    if (inMemoryPartitions.containsKey(partition.getId())) {
+      Partition<I, V, E, M> existingPartition =
+          inMemoryPartitions.get(partition.getId());
+      existingPartition.addPartition(partition);
+    } else if (onDiskPartitions.containsKey(partition.getId())) {
+      Lock lock = getLock(partition.getId());
+      lock.lock();
+      if (loadedPartition != null && loadedPartition.getId() ==
+          partition.getId()) {
+        loadedPartition.addPartition(partition);
+      } else {
+        try {
+          appendPartitionOutOfCore(partition);
+          onDiskPartitions.put(partition.getId(),
+              onDiskPartitions.get(partition.getId()) +
+                  (int) partition.getVertexCount());
+        } catch (IOException e) {
+          throw new IllegalStateException("addPartition: failed " +
+              "writing vertices to partition " + partition.getId() + " on disk",
+              e);
+        }
+      }
+      lock.unlock();
+    } else {
+      Lock lock = createLock(partition.getId());
+      if (lock != null) {
+        addPartitionNoLock(partition);
+        lock.unlock();
+      } else {
+        // Another thread is already creating the partition,
+        // so we make sure it's done before repeating the call.
+        lock = getLock(partition.getId());
+        lock.lock();
+        lock.unlock();
+        addPartition(partition);
+      }
+    }
+  }
+
+  @Override
+  public Partition<I, V, E, M> getPartition(Integer partitionId) {
+    if (inMemoryPartitions.containsKey(partitionId)) {
+      return inMemoryPartitions.get(partitionId);
+    } else if (onDiskPartitions.containsKey(partitionId)) {
+      loadPartition(partitionId);
+      return loadedPartition;
+    } else {
+      throw new IllegalStateException("getPartition: partition " +
+          partitionId + " does not exist");
+    }
+  }
+
+  @Override
+  public Partition<I, V, E, M> removePartition(Integer partitionId) {
+    partitionLocks.remove(partitionId);
+    if (onDiskPartitions.containsKey(partitionId)) {
+      Partition<I, V, E, M> partition;
+      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
+        partition = loadedPartition;
+        loadedPartition = null;
+      } else {
+        try {
+          partition = readPartition(partitionId);
+        } catch (IOException e) {
+          throw new IllegalStateException("removePartition: failed reading " +
+              "partition " + partitionId + " from disk", e);
+        }
+      }
+      onDiskPartitions.remove(partitionId);
+      return partition;
+    } else {
+      return inMemoryPartitions.remove(partitionId);
+    }
+  }
+
+  @Override
+  public void deletePartition(Integer partitionId) {
+    partitionLocks.remove(partitionId);
+    if (inMemoryPartitions.containsKey(partitionId)) {
+      inMemoryPartitions.remove(partitionId);
+    } else {
+      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
+        loadedPartition = null;
+      } else {
+        File file = new File(getPartitionPath(partitionId));
+        file.delete();
+      }
+      onDiskPartitions.remove(partitionId);
+    }
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return partitionLocks.containsKey(partitionId);
+  }
+
+  @Override
+  public Iterable<Integer> getPartitionIds() {
+    return Iterables.concat(inMemoryPartitions.keySet(),
+        onDiskPartitions.keySet());
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return partitionLocks.size();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
new file mode 100644
index 0000000..a7ac84b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Defines the partitioning framework for this application.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public interface GraphPartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    ImmutableClassesGiraphConfigurable {
+  /**
+   * Create the {@link MasterGraphPartitioner} used by the master.
+   * Instantiated once by the master and reused.
+   *
+   * @return Instantiated master graph partitioner
+   */
+  MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
+
+  /**
+   * Create the {@link WorkerGraphPartitioner} used by the worker.
+   * Instantiated once by every worker and reused.
+   *
+   * @return Instantiated worker graph partitioner
+   */
+  WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
new file mode 100644
index 0000000..6724e50
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Master will execute a hash based partitioning.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashMasterPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    MasterGraphPartitioner<I, V, E, M> {
+  /** Multiplier for the current workers squared */
+  public static final String PARTITION_COUNT_MULTIPLIER =
+    "hash.masterPartitionCountMultipler";
+  /** Default mulitplier for current workers squared */
+  public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
+  /** Overrides default partition count calculation if not -1 */
+  public static final String USER_PARTITION_COUNT =
+    "hash.userPartitionCount";
+  /** Default user partition count */
+  public static final int DEFAULT_USER_PARTITION_COUNT = -1;
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
+  /**
+   * ZooKeeper has a limit of the data in a single znode of 1 MB and
+   * each entry can go be on the average somewhat more than 300 bytes
+   */
+  private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
+  /** Provided configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+  /** Specified partition count (overrides calculation) */
+  private final int userPartitionCount;
+  /** Partition count (calculated in createInitialPartitionOwners) */
+  private int partitionCount = -1;
+  /** Save the last generated partition owner list */
+  private List<PartitionOwner> partitionOwnerList;
+
+  /**
+   * Constructor.
+   *
+   *@param conf Configuration used.
+   */
+  public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+    userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
+        DEFAULT_USER_PARTITION_COUNT);
+  }
+
+  @Override
+  public Collection<PartitionOwner> createInitialPartitionOwners(
+      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
+    if (availableWorkerInfos.isEmpty()) {
+      throw new IllegalArgumentException(
+          "createInitialPartitionOwners: No available workers");
+    }
+    List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
+    Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
+    if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
+      float multiplier = conf.getFloat(
+          PARTITION_COUNT_MULTIPLIER,
+          DEFAULT_PARTITION_COUNT_MULTIPLIER);
+      partitionCount =
+          Math.max((int) (multiplier * availableWorkerInfos.size() *
+              availableWorkerInfos.size()),
+              1);
+    } else {
+      partitionCount = userPartitionCount;
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("createInitialPartitionOwners: Creating " +
+        partitionCount + ", default would have been " +
+        (availableWorkerInfos.size() *
+         availableWorkerInfos.size()) + " partitions.");
+    }
+    if (partitionCount > MAX_PARTTIONS) {
+      LOG.warn("createInitialPartitionOwners: " +
+          "Reducing the partitionCount to " + MAX_PARTTIONS +
+          " from " + partitionCount);
+      partitionCount = MAX_PARTTIONS;
+    }
+
+    for (int i = 0; i < partitionCount; ++i) {
+      PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
+      if (!workerIt.hasNext()) {
+        workerIt = availableWorkerInfos.iterator();
+      }
+      ownerList.add(owner);
+    }
+    this.partitionOwnerList = ownerList;
+    return ownerList;
+  }
+
+  @Override
+  public Collection<PartitionOwner> getCurrentPartitionOwners() {
+    return partitionOwnerList;
+  }
+
+  /**
+   * Subclasses can set the partition owner list.
+   *
+   * @param partitionOwnerList New partition owner list.
+   */
+  protected void setPartitionOwnerList(List<PartitionOwner>
+  partitionOwnerList) {
+    this.partitionOwnerList = partitionOwnerList;
+  }
+
+  @Override
+  public Collection<PartitionOwner> generateChangedPartitionOwners(
+      Collection<PartitionStats> allPartitionStatsList,
+      Collection<WorkerInfo> availableWorkerInfos,
+      int maxWorkers,
+      long superstep) {
+    return PartitionBalancer.balancePartitionsAcrossWorkers(
+        conf,
+        partitionOwnerList,
+        allPartitionStatsList,
+        availableWorkerInfos);
+  }
+
+  @Override
+  public PartitionStats createPartitionStats() {
+    return new PartitionStats();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
new file mode 100644
index 0000000..69e7a5e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Divides the vertices into partitions by their hash code using a simple
+ * round-robin hash for great balancing if given a random hash code.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashPartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements GraphPartitionerFactory<I, V, E, M> {
+  /** Saved configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  @Override
+  public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
+    return new HashMasterPartitioner<I, V, E, M>(getConf());
+  }
+
+  @Override
+  public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
+    return new HashWorkerPartitioner<I, V, E, M>();
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
new file mode 100644
index 0000000..df6457b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Divides the vertices into partitions by their hash code using ranges of the
+ * hash space.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashRangePartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements GraphPartitionerFactory<I, V, E, M> {
+  /** Saved configuration */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+
+  @Override
+  public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
+    return new HashMasterPartitioner<I, V, E, M>(getConf());
+  }
+
+  @Override
+  public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
+    return new HashRangeWorkerPartitioner<I, V, E, M>();
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java
new file mode 100644
index 0000000..ea2cf66
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.primitives.UnsignedInts;
+
+/**
+ * Implements range-based partitioning from the id hash code.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashRangeWorkerPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends HashWorkerPartitioner<I, V, E, M> {
+  /** A transformed hashCode() must be strictly smaller than this. */
+  private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L;
+
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    long unsignedHashCode = UnsignedInts.toLong(vertexId.hashCode());
+    // The reader can verify that unsignedHashCode of HASH_LIMIT - 1 yields
+    // index of size - 1, and unsignedHashCode of 0 yields index of 0.
+    int index = (int)
+        ((unsignedHashCode * getPartitionOwners().size()) / HASH_LIMIT);
+    return partitionOwnerList.get(index);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
new file mode 100644
index 0000000..a76f803
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Implements hash-based partitioning from the id hash code.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashWorkerPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements WorkerGraphPartitioner<I, V, E, M> {
+  /**
+   * Mapping of the vertex ids to {@link PartitionOwner}.
+   */
+  protected List<PartitionOwner> partitionOwnerList =
+      Lists.newArrayList();
+
+  @Override
+  public PartitionOwner createPartitionOwner() {
+    return new BasicPartitionOwner();
+  }
+
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    return partitionOwnerList.get(
+        Math.abs(vertexId.hashCode() % partitionOwnerList.size()));
+  }
+
+  @Override
+  public Collection<PartitionStats> finalizePartitionStats(
+      Collection<PartitionStats> workerPartitionStats,
+      PartitionStore<I, V, E, M> partitionStore) {
+    // No modification necessary
+    return workerPartitionStats;
+  }
+
+  @Override
+  public PartitionExchange updatePartitionOwners(
+      WorkerInfo myWorkerInfo,
+      Collection<? extends PartitionOwner> masterSetPartitionOwners,
+      PartitionStore<I, V, E, M> partitionStore) {
+    partitionOwnerList.clear();
+    partitionOwnerList.addAll(masterSetPartitionOwners);
+
+    Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
+    Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
+        new HashMap<WorkerInfo, List<Integer>>();
+    for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+      if (partitionOwner.getPreviousWorkerInfo() == null) {
+        continue;
+      } else if (partitionOwner.getWorkerInfo().equals(
+          myWorkerInfo) &&
+          partitionOwner.getPreviousWorkerInfo().equals(
+              myWorkerInfo)) {
+        throw new IllegalStateException(
+            "updatePartitionOwners: Impossible to have the same " +
+                "previous and current worker info " + partitionOwner +
+                " as me " + myWorkerInfo);
+      } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
+        dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
+      } else if (partitionOwner.getPreviousWorkerInfo().equals(
+          myWorkerInfo)) {
+        if (workerPartitionOwnerMap.containsKey(
+            partitionOwner.getWorkerInfo())) {
+          workerPartitionOwnerMap.get(
+              partitionOwner.getWorkerInfo()).add(
+                  partitionOwner.getPartitionId());
+        } else {
+          List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
+          tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
+          workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
+                                      tmpPartitionOwnerList);
+        }
+      }
+    }
+
+    return new PartitionExchange(dependentWorkerSet,
+        workerPartitionOwnerMap);
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> getPartitionOwners() {
+    return partitionOwnerList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java
new file mode 100644
index 0000000..e911303
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.giraph.graph.WorkerInfo;
+
+/**
+ * Determines how to divide the graph into partitions, how to manipulate
+ * partitions and then how to assign those partitions to workers.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public interface MasterGraphPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Set some initial partition owners for the graph. Guaranteed to be called
+   * prior to the graph being loaded (initial or restart).
+   *
+   * @param availableWorkerInfos Workers available for partition assignment
+   * @param maxWorkers Maximum number of workers
+   * @return Collection of generated partition owners.
+   */
+  Collection<PartitionOwner> createInitialPartitionOwners(
+      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers);
+
+  /**
+   * After the worker stats have been merged to a single list, the master can
+   * use this information to send commands to the workers for any
+   * {@link Partition} changes. This protocol is specific to the
+   * {@link MasterGraphPartitioner} implementation.
+   *
+   * @param allPartitionStatsList All partition stats from all workers.
+   * @param availableWorkers Workers available for partition assignment
+   * @param maxWorkers Maximum number of workers
+   * @param superstep Partition owners will be set for this superstep
+   * @return Collection of {@link PartitionOwner} objects that changed from
+   *         the previous superstep, empty list if no change.
+   */
+  Collection<PartitionOwner> generateChangedPartitionOwners(
+      Collection<PartitionStats> allPartitionStatsList,
+      Collection<WorkerInfo> availableWorkers,
+      int maxWorkers,
+      long superstep);
+
+  /**
+   * Get current partition owners at this time.
+   *
+   * @return Collection of current {@link PartitionOwner} objects
+   */
+  Collection<PartitionOwner> getCurrentPartitionOwners();
+
+  /**
+   * Instantiate the {@link PartitionStats} implementation used to read the
+   * worker stats
+   *
+   * @return Instantiated {@link PartitionStats} object
+   */
+  PartitionStats createPartitionStats();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java
new file mode 100644
index 0000000..b0f156f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A generic container that stores vertices.  Vertex ids will map to exactly
+ * one partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface Partition<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends Writable, ImmutableClassesGiraphConfigurable<I, V, E, M>,
+    Iterable<Vertex<I, V, E, M>> {
+  /**
+   * Initialize the partition.  Guaranteed to be called before used.
+   *
+   * @param partitionId Partition id
+   * @param progressable Progressable to call progress
+   */
+  void initialize(int partitionId, Progressable progressable);
+
+  /**
+   * Get the vertex for this vertex index.
+   *
+   * @param vertexIndex Vertex index to search for
+   * @return Vertex if it exists, null otherwise
+   */
+  Vertex<I, V, E, M> getVertex(I vertexIndex);
+
+  /**
+   * Put a vertex into the Partition
+   *
+   * @param vertex Vertex to put in the Partition
+   * @return old vertex value (i.e. null if none existed prior)
+   */
+  Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex);
+
+  /**
+   * Remove a vertex from the Partition
+   *
+   * @param vertexIndex Vertex index to remove
+   * @return The removed vertex.
+   */
+  Vertex<I, V, E, M> removeVertex(I vertexIndex);
+
+  /**
+   * Add a partition's vertices
+   *
+   * @param partition Partition to add
+   */
+  void addPartition(Partition<I, V, E, M> partition);
+
+  /**
+   * Get the number of vertices in this partition
+   *
+   * @return Number of vertices
+   */
+  long getVertexCount();
+
+  /**
+   * Get the number of edges in this partition.
+   *
+   * @return Number of edges.
+   */
+  long getEdgeCount();
+
+  /**
+   * Get the partition id.
+   *
+   * @return Id of this partition.
+   */
+  int getId();
+
+  /**
+   * Set the partition id.
+   *
+   * @param id Id of this partition
+   */
+  void setId(int id);
+
+  /**
+   * Set the context.
+   *
+   * @param progressable Progressable
+   */
+  void setProgressable(Progressable progressable);
+
+  /**
+   * Save potentially modified vertex back to the partition.
+   *
+   * @param vertex Vertex to save
+   */
+  void saveVertex(Vertex<I, V, E, M> vertex);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java
new file mode 100644
index 0000000..2d1c2a2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+/**
+ * Helper class for balancing partitions across a set of workers.
+ */
+public class PartitionBalancer {
+  /** Partition balancing algorithm */
+  public static final String PARTITION_BALANCE_ALGORITHM =
+    "hash.partitionBalanceAlgorithm";
+  /** No rebalancing during the supersteps */
+  public static final String STATIC_BALANCE_ALGORITHM =
+    "static";
+  /** Rebalance across supersteps by edges */
+  public static final String EGDE_BALANCE_ALGORITHM =
+    "edges";
+  /** Rebalance across supersteps by vertices */
+  public static final String VERTICES_BALANCE_ALGORITHM =
+    "vertices";
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
+
+  /**
+   * What value to balance partitions with?  Edges, vertices?
+   */
+  private enum BalanceValue {
+    /** Not chosen */
+    UNSET,
+    /** Balance with edges */
+    EDGES,
+    /** Balance with vertices */
+    VERTICES
+  }
+
+  /**
+   * Do not construct this class.
+   */
+  private PartitionBalancer() { }
+
+  /**
+   * Get the value used to balance.
+   *
+   * @param partitionStat Stats of this partition.
+   * @param balanceValue Type of the value to balance.
+   * @return Balance value.
+   */
+  private static long getBalanceValue(PartitionStats partitionStat,
+      BalanceValue balanceValue) {
+    switch (balanceValue) {
+    case EDGES:
+      return partitionStat.getEdgeCount();
+    case VERTICES:
+      return partitionStat.getVertexCount();
+    default:
+      throw new IllegalArgumentException(
+          "getBalanceValue: Illegal balance value " + balanceValue);
+    }
+  }
+
+  /**
+   * Used to sort the partition owners from lowest value to highest value
+   */
+  private static class PartitionOwnerComparator implements
+      Comparator<PartitionOwner> {
+    /** Map of owner to stats */
+    private final Map<PartitionOwner, PartitionStats> ownerStatMap;
+    /** Value type to compare on */
+    private final BalanceValue balanceValue;
+
+
+    /**
+     * Only constructor.
+     *
+     * @param ownerStatMap Map of owners to stats.
+     * @param balanceValue Value to balance with.
+     */
+    public PartitionOwnerComparator(
+        Map<PartitionOwner, PartitionStats> ownerStatMap,
+        BalanceValue balanceValue) {
+      this.ownerStatMap = ownerStatMap;
+      this.balanceValue = balanceValue;
+    }
+
+    @Override
+    public int compare(PartitionOwner owner1, PartitionOwner owner2) {
+      return (int)
+          (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
+              getBalanceValue(ownerStatMap.get(owner2), balanceValue));
+    }
+  }
+
+  /**
+   * Structure to keep track of how much value a {@link WorkerInfo} has
+   * been assigned.
+   */
+  private static class WorkerInfoAssignments implements
+      Comparable<WorkerInfoAssignments> {
+    /** Worker info associated */
+    private final WorkerInfo workerInfo;
+    /** Balance value */
+    private final BalanceValue balanceValue;
+    /** Map of owner to stats */
+    private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
+    /** Current value of this object */
+    private long value = 0;
+
+    /**
+     * Constructor with final values.
+     *
+     * @param workerInfo Worker info for assignment.
+     * @param balanceValue Value used to balance.
+     * @param ownerStatsMap Map of owner to stats.
+     */
+    public WorkerInfoAssignments(
+        WorkerInfo workerInfo,
+        BalanceValue balanceValue,
+        Map<PartitionOwner, PartitionStats> ownerStatsMap) {
+      this.workerInfo = workerInfo;
+      this.balanceValue = balanceValue;
+      this.ownerStatsMap = ownerStatsMap;
+    }
+
+    /**
+     * Get the total value of all partitions assigned to this worker.
+     *
+     * @return Total value of all partition assignments.
+     */
+    public long getValue() {
+      return value;
+    }
+
+    /**
+     * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
+     *
+     * @param partitionOwner PartitionOwner to assign.
+     */
+    public void assignPartitionOwner(
+        PartitionOwner partitionOwner) {
+      value += getBalanceValue(ownerStatsMap.get(partitionOwner),
+          balanceValue);
+      if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
+        partitionOwner.setPreviousWorkerInfo(
+            partitionOwner.getWorkerInfo());
+        partitionOwner.setWorkerInfo(workerInfo);
+      } else {
+        partitionOwner.setPreviousWorkerInfo(null);
+      }
+    }
+
+    @Override
+    public int compareTo(WorkerInfoAssignments other) {
+      return (int)
+          (getValue() - ((WorkerInfoAssignments) other).getValue());
+    }
+  }
+
+  /**
+   * Balance the partitions with an algorithm based on a value.
+   *
+   * @param conf Configuration to find the algorithm
+   * @param partitionOwners All the owners of all partitions
+   * @param allPartitionStats All the partition stats
+   * @param availableWorkerInfos All the available workers
+   * @return Balanced partition owners
+   */
+  public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
+      Configuration conf,
+      Collection<PartitionOwner> partitionOwners,
+      Collection<PartitionStats> allPartitionStats,
+      Collection<WorkerInfo> availableWorkerInfos) {
+
+    String balanceAlgorithm =
+        conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
+          balanceAlgorithm);
+    }
+    BalanceValue balanceValue = BalanceValue.UNSET;
+    if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
+      return partitionOwners;
+    } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
+      balanceValue = BalanceValue.EDGES;
+    } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
+      balanceValue = BalanceValue.VERTICES;
+    } else {
+      throw new IllegalArgumentException(
+          "balancePartitionsAcrossWorkers: Illegal balance " +
+              "algorithm - " + balanceAlgorithm);
+    }
+
+    // Join the partition stats and partition owners by partition id
+    Map<Integer, PartitionStats> idStatMap =
+        new HashMap<Integer, PartitionStats>();
+    for (PartitionStats partitionStats : allPartitionStats) {
+      if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
+          null) {
+        throw new IllegalStateException(
+            "balancePartitionsAcrossWorkers: Duplicate partition id " +
+                "for " + partitionStats);
+      }
+    }
+    Map<PartitionOwner, PartitionStats> ownerStatsMap =
+        new HashMap<PartitionOwner, PartitionStats>();
+    for (PartitionOwner partitionOwner : partitionOwners) {
+      PartitionStats partitionStats =
+          idStatMap.get(partitionOwner.getPartitionId());
+      if (partitionStats == null) {
+        throw new IllegalStateException(
+            "balancePartitionsAcrossWorkers: Missing partition " +
+                "stats for " + partitionOwner);
+      }
+      if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
+        throw new IllegalStateException(
+            "balancePartitionsAcrossWorkers: Duplicate partition " +
+                "owner " + partitionOwner);
+      }
+    }
+    if (ownerStatsMap.size() != partitionOwners.size()) {
+      throw new IllegalStateException(
+          "balancePartitionsAcrossWorkers: ownerStats count = " +
+              ownerStatsMap.size() + ", partitionOwners count = " +
+              partitionOwners.size() + " and should match.");
+    }
+
+    List<WorkerInfoAssignments> workerInfoAssignmentsList =
+        new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
+    for (WorkerInfo workerInfo : availableWorkerInfos) {
+      workerInfoAssignmentsList.add(
+          new WorkerInfoAssignments(
+              workerInfo, balanceValue, ownerStatsMap));
+    }
+
+    // A simple heuristic for balancing the partitions across the workers
+    // using a value (edges, vertices).  An improvement would be to
+    // take into account the already existing partition worker assignments.
+    // 1.  Sort the partitions by size
+    // 2.  Place the workers in a min heap sorted by their total balance
+    //     value.
+    // 3.  From largest partition to the smallest, take the partition
+    //     worker at the top of the heap, add the partition to it, and
+    //     then put it back in the heap
+    List<PartitionOwner> partitionOwnerList =
+        new ArrayList<PartitionOwner>(partitionOwners);
+    Collections.sort(partitionOwnerList,
+        Collections.reverseOrder(
+            new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
+    PriorityQueue<WorkerInfoAssignments> minQueue =
+        new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
+    for (PartitionOwner partitionOwner : partitionOwnerList) {
+      WorkerInfoAssignments chosenWorker = minQueue.remove();
+      chosenWorker.assignPartitionOwner(partitionOwner);
+      minQueue.add(chosenWorker);
+    }
+
+    return partitionOwnerList;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java
new file mode 100644
index 0000000..1b2be9a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.giraph.graph.WorkerInfo;
+
+/**
+ * Describes what is required to send and wait for in a potential partition
+ * exchange between workers.
+ */
+public class PartitionExchange {
+  /** Workers that I am dependent on before I can continue */
+  private final Set<WorkerInfo> myDependencyWorkerSet;
+  /** Workers that I need to sent partitions to */
+  private final Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap;
+
+  /**
+   * Only constructor.
+   *
+   * @param myDependencyWorkerSet All the workers I must wait for
+   * @param sendWorkerPartitionMap Partitions I need to send to other workers
+   */
+  public PartitionExchange(
+      Set<WorkerInfo> myDependencyWorkerSet,
+      Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap) {
+    this.myDependencyWorkerSet = myDependencyWorkerSet;
+    this.sendWorkerPartitionMap = sendWorkerPartitionMap;
+  }
+
+  /**
+   * Get the workers that I must wait for
+   *
+   * @return Set of workers I must wait for
+   */
+  public Set<WorkerInfo> getMyDependencyWorkerSet() {
+    return myDependencyWorkerSet;
+  }
+
+  /**
+   * Get a mapping of worker to list of partition ids I need to send to.
+   *
+   * @return Mapping of worker to partition id list I will send to.
+   */
+  public Map<WorkerInfo, List<Integer>> getSendWorkerPartitionMap() {
+    return sendWorkerPartitionMap;
+  }
+
+  /**
+   * Is this worker involved in a partition exchange?  Receiving or sending?
+   *
+   * @return True if needs to be involved in the exchange, false otherwise.
+   */
+  public boolean doExchange() {
+    return !myDependencyWorkerSet.isEmpty() ||
+        !sendWorkerPartitionMap.isEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java
new file mode 100644
index 0000000..5616a8d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Metadata about ownership of a partition.
+ */
+public interface PartitionOwner extends Writable {
+  /**
+   * Get the partition id that maps to the relevant {@link Partition} object
+   *
+   * @return Partition id
+   */
+  int getPartitionId();
+
+  /**
+   * Get the worker information that is currently responsible for
+   * the partition id.
+   *
+   * @return Owning worker information.
+   */
+  WorkerInfo getWorkerInfo();
+
+  /**
+   * Set the current worker info.
+   *
+   * @param workerInfo Worker info responsible for partition
+   */
+  void setWorkerInfo(WorkerInfo workerInfo);
+
+  /**
+   * Get the worker information that was previously responsible for the
+   * partition id.
+   *
+   * @return Owning worker information or null if no previous worker info.
+   */
+  WorkerInfo getPreviousWorkerInfo();
+
+  /**
+   * Set the previous worker info.
+   *
+   * @param workerInfo Worker info that was previously responsible for the
+   *        partition.
+   */
+  void setPreviousWorkerInfo(WorkerInfo workerInfo);
+
+  /**
+   * If this is a restarted checkpoint, the worker will use this information
+   * to determine where the checkpointed partition was stored on HDFS.
+   *
+   * @return Prefix of the checkpoint HDFS files for this partition, null if
+   *         this is not a restarted superstep.
+   */
+  String getCheckpointFilesPrefix();
+
+  /**
+   * Set the checkpoint files prefix.  Master uses this.
+   *
+   * @param checkpointFilesPrefix HDFS checkpoint file prefix
+   */
+  void setCheckpointFilesPrefix(String checkpointFilesPrefix);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
new file mode 100644
index 0000000..6ee0228
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Used to keep track of statistics of every {@link Partition}. Contains no
+ * actual partition data, only the statistics.
+ */
+public class PartitionStats implements Writable {
+  /** Id of partition to keep stats for */
+  private int partitionId = -1;
+  /** Vertices in this partition */
+  private long vertexCount = 0;
+  /** Finished vertices in this partition */
+  private long finishedVertexCount = 0;
+  /** Edges in this partition */
+  private long edgeCount = 0;
+  /** Messages sent from this partition */
+  private long messagesSentCount = 0;
+
+  /**
+   * Default constructor for reflection.
+   */
+  public PartitionStats() { }
+
+  /**
+   * Constructor with the initial stats.
+   *
+   * @param partitionId Partition count.
+   * @param vertexCount Vertex count.
+   * @param finishedVertexCount Finished vertex count.
+   * @param edgeCount Edge count.
+   * @param messagesSentCount Number of messages sent
+   */
+  public PartitionStats(int partitionId,
+      long vertexCount,
+      long finishedVertexCount,
+      long edgeCount,
+      long messagesSentCount) {
+    this.partitionId = partitionId;
+    this.vertexCount = vertexCount;
+    this.finishedVertexCount = finishedVertexCount;
+    this.edgeCount = edgeCount;
+    this.messagesSentCount = messagesSentCount;
+  }
+
+  /**
+   * Set the partition id.
+   *
+   * @param partitionId New partition id.
+   */
+  public void setPartitionId(int partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  /**
+   * Get partition id.
+   *
+   * @return Partition id.
+   */
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  /**
+   * Increment the vertex count by one.
+   */
+  public void incrVertexCount() {
+    ++vertexCount;
+  }
+
+  /**
+   * Get the vertex count.
+   *
+   * @return Vertex count.
+   */
+  public long getVertexCount() {
+    return vertexCount;
+  }
+
+  /**
+   * Increment the finished vertex count by one.
+   */
+  public void incrFinishedVertexCount() {
+    ++finishedVertexCount;
+  }
+
+  /**
+   * Get the finished vertex count.
+   *
+   * @return Finished vertex count.
+   */
+  public long getFinishedVertexCount() {
+    return finishedVertexCount;
+  }
+
+  /**
+   * Add edges to the edge count.
+   *
+   * @param edgeCount Number of edges to add.
+   */
+  public void addEdgeCount(long edgeCount) {
+    this.edgeCount += edgeCount;
+  }
+
+  /**
+   * Get the edge count.
+   *
+   * @return Edge count.
+   */
+  public long getEdgeCount() {
+    return edgeCount;
+  }
+
+  /**
+   * Add messages to the messages sent count.
+   *
+   * @param messagesSentCount Number of messages to add.
+   */
+  public void addMessagesSentCount(long messagesSentCount) {
+    this.messagesSentCount += messagesSentCount;
+  }
+
+  /**
+   * Get the messages sent count.
+   *
+   * @return Messages sent count.
+   */
+  public long getMessagesSentCount() {
+    return messagesSentCount;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    partitionId = input.readInt();
+    vertexCount = input.readLong();
+    finishedVertexCount = input.readLong();
+    edgeCount = input.readLong();
+    messagesSentCount = input.readLong();
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(partitionId);
+    output.writeLong(vertexCount);
+    output.writeLong(finishedVertexCount);
+    output.writeLong(edgeCount);
+    output.writeLong(messagesSentCount);
+  }
+
+  @Override
+  public String toString() {
+    return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
+        finishedVertexCount + ",edges=" + edgeCount + ",msgsSent=" +
+        messagesSentCount + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
new file mode 100644
index 0000000..07f55ed
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Structure that stores partitions for a worker.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class PartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+
+  /**
+   * Add a new partition to the store or just the vertices from the partition
+   * to the old partition.
+   *
+   * @param partition Partition to add
+   */
+  public abstract void addPartition(Partition<I, V, E, M> partition);
+
+  /**
+   * Get a partition.
+   *
+   * @param partitionId Partition id
+   * @return The requested partition
+   */
+  public abstract Partition<I, V, E, M> getPartition(Integer partitionId);
+
+  /**
+   * Remove a partition and return it.
+   *
+   * @param partitionId Partition id
+   * @return The removed partition
+   */
+  public abstract Partition<I, V, E, M> removePartition(Integer partitionId);
+
+  /**
+   * Just delete a partition
+   * (more efficient than {@link #removePartition(Integer partitionID)} if the
+   * partition is out of core).
+   *
+   * @param partitionId Partition id
+   */
+  public abstract void deletePartition(Integer partitionId);
+
+  /**
+   * Whether a specific partition is present in the store.
+   *
+   * @param partitionId Partition id
+   * @return True iff the partition is present
+   */
+  public abstract boolean hasPartition(Integer partitionId);
+
+  /**
+   * Return the ids of all the stored partitions as an Iterable.
+   *
+   * @return The partition ids
+   */
+  public abstract Iterable<Integer> getPartitionIds();
+
+  /**
+   * Return the number of stored partitions.
+   *
+   * @return The number of partitions
+   */
+  public abstract int getNumPartitions();
+
+  /**
+   * Whether the partition store is empty.
+   *
+   * @return True iff there are no partitions in the store
+   */
+  public boolean isEmpty() {
+    return getNumPartitions() == 0;
+  }
+
+  /**
+   * Return all the stored partitions as an Iterable.  Note that this may force
+   * out-of-core partitions to be loaded into memory if using out-of-core.
+   *
+   * @return The partition objects
+   */
+  public Iterable<Partition<I, V, E, M>> getPartitions() {
+    return Iterables.transform(getPartitionIds(),
+        new Function<Integer, Partition<I, V, E, M>>() {
+          @Override
+          public Partition<I, V, E, M> apply(Integer partitionId) {
+            return getPartition(partitionId);
+          }
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
new file mode 100644
index 0000000..5600dad
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Helper class for {@link Partition} related operations.
+ */
+public class PartitionUtils {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(PartitionUtils.class);
+
+  /**
+   * Do not construct this object.
+   */
+  private PartitionUtils() { }
+
+  /**
+   * Compare edge counts for Entry<WorkerInfo, VertexEdgeCount> objects.
+   */
+  private static class EdgeCountComparator implements
+      Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+    @Override
+    public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+        Entry<WorkerInfo, VertexEdgeCount> worker2) {
+      return (int) (worker1.getValue().getEdgeCount() -
+        worker2.getValue().getEdgeCount());
+    }
+  }
+
+  /**
+   * Compare vertex counts between a {@link WorkerInfo} and
+   * {@link VertexEdgeCount}.
+   */
+  private static class VertexCountComparator implements
+      Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+    @Override
+    public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+        Entry<WorkerInfo, VertexEdgeCount> worker2) {
+      return (int) (worker1.getValue().getVertexCount() -
+        worker2.getValue().getVertexCount());
+    }
+  }
+
+  /**
+   * Check for imbalances on a per worker basis, by calculating the
+   * mean, high and low workers by edges and vertices.
+   *
+   * @param partitionOwnerList List of partition owners.
+   * @param allPartitionStats All the partition stats.
+   */
+  public static void analyzePartitionStats(
+      Collection<PartitionOwner> partitionOwnerList,
+      List<PartitionStats> allPartitionStats) {
+    Map<Integer, PartitionOwner> idOwnerMap =
+        new HashMap<Integer, PartitionOwner>();
+    for (PartitionOwner partitionOwner : partitionOwnerList) {
+      if (idOwnerMap.put(partitionOwner.getPartitionId(),
+          partitionOwner) != null) {
+        throw new IllegalStateException(
+            "analyzePartitionStats: Duplicate partition " +
+                partitionOwner);
+      }
+    }
+
+    Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
+    VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
+    for (PartitionStats partitionStats : allPartitionStats) {
+      WorkerInfo workerInfo =
+          idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
+      VertexEdgeCount vertexEdgeCount =
+          workerStatsMap.get(workerInfo);
+      if (vertexEdgeCount == null) {
+        workerStatsMap.put(
+            workerInfo,
+            new VertexEdgeCount(partitionStats.getVertexCount(),
+                partitionStats.getEdgeCount()));
+      } else {
+        workerStatsMap.put(
+            workerInfo,
+            vertexEdgeCount.incrVertexEdgeCount(
+                partitionStats.getVertexCount(),
+                partitionStats.getEdgeCount()));
+      }
+      totalVertexEdgeCount =
+          totalVertexEdgeCount.incrVertexEdgeCount(
+              partitionStats.getVertexCount(),
+              partitionStats.getEdgeCount());
+    }
+
+    List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
+        Lists.newArrayList(workerStatsMap.entrySet());
+
+    if (LOG.isInfoEnabled()) {
+      Collections.sort(workerEntryList, new VertexCountComparator());
+      LOG.info("analyzePartitionStats: Vertices - Mean: " +
+          (totalVertexEdgeCount.getVertexCount() /
+              workerStatsMap.size()) +
+              ", Min: " +
+              workerEntryList.get(0).getKey() + " - " +
+              workerEntryList.get(0).getValue().getVertexCount() +
+              ", Max: " +
+              workerEntryList.get(workerEntryList.size() - 1).getKey() +
+              " - " +
+              workerEntryList.get(workerEntryList.size() - 1).
+              getValue().getVertexCount());
+      Collections.sort(workerEntryList, new EdgeCountComparator());
+      LOG.info("analyzePartitionStats: Edges - Mean: " +
+          (totalVertexEdgeCount.getEdgeCount() /
+              workerStatsMap.size()) +
+              ", Min: " +
+              workerEntryList.get(0).getKey() + " - " +
+              workerEntryList.get(0).getValue().getEdgeCount() +
+              ", Max: " +
+              workerEntryList.get(workerEntryList.size() - 1).getKey() +
+              " - " +
+              workerEntryList.get(workerEntryList.size() - 1).
+              getValue().getEdgeCount());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
new file mode 100644
index 0000000..8e417ec
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Some functionality is provided, but this is meant for developers to
+ * determine the partitioning based on the actual types of data.  The
+ * implementation of several methods are left to the developer who is trying
+ * to control the amount of messages sent from one worker to another.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangeMasterPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    MasterGraphPartitioner<I, V, E, M> {
+  @Override
+  public PartitionStats createPartitionStats() {
+    return new RangePartitionStats<I>();
+  }
+}


Mime
View raw message