giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [8/23] GIRAPH-409: Refactor / cleanups (nitay)
Date Fri, 04 Jan 2013 20:52:39 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
new file mode 100644
index 0000000..4281bf5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Master compute can access and change aggregators through this interface
+ */
+public interface MasterAggregatorUsage {
+  /**
+   * Register an aggregator in preSuperstep() and/or preApplication(). This
+   * aggregator will have its value reset at the end of each super step.
+   *
+   * @param name of aggregator
+   * @param aggregatorClass Class type of the aggregator
+   * @param <A> Aggregator type
+   * @return True iff aggregator wasn't already registered
+   */
+  <A extends Writable> boolean registerAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException;
+
+  /**
+   * Register persistent aggregator in preSuperstep() and/or
+   * preApplication(). This aggregator will not reset value at the end of
+   * super step.
+   *
+   * @param name of aggregator
+   * @param aggregatorClass Class type of the aggregator
+   * @param <A> Aggregator type
+   * @return True iff aggregator wasn't already registered
+   */
+  <A extends Writable> boolean registerPersistentAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException;
+
+  /**
+   * Get value of an aggregator.
+   *
+   * @param name Name of aggregator
+   * @param <A> Aggregated value
+   * @return Value of the aggregator
+   */
+  <A extends Writable> A getAggregatedValue(String name);
+
+  /**
+   * Sets value of an aggregator.
+   *
+   * @param name Name of aggregator
+   * @param value Value to set
+   * @param <A> Aggregated value
+   */
+  <A extends Writable> void setAggregatedValue(String name, A value);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
new file mode 100644
index 0000000..cdb9e85
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.graph.GraphState;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Interface for defining a master vertex that can perform centralized
+ * computation between supersteps. This class will be instantiated on the
+ * master node and will run every superstep before the workers do.
+ *
+ * Communication with the workers should be performed via aggregators. The
+ * values of the aggregators are broadcast to the workers before
+ * vertex.compute() is called and collected by the master before
+ * master.compute() is called. This means aggregator values used by the workers
+ * are consistent with aggregator values from the master from the same
+ * superstep and aggregator used by the master are consistent with aggregator
+ * values from the workers from the previous superstep. Note that the master
+ * has to register its own aggregators (it does not call {@link WorkerContext}
+ * functions), but it uses all aggregators by default, so useAggregator does
+ * not have to be called.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
+    ImmutableClassesGiraphConfigurable {
+  /** If true, do not do anymore computation on this vertex. */
+  private boolean halt = false;
+  /** Global graph state **/
+  private GraphState graphState;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  /**
+   * Must be defined by user to specify what the master has to do.
+   */
+  public abstract void compute();
+
+  /**
+   * Initialize the MasterCompute class, this is the place to register
+   * aggregators.
+   */
+  public abstract void initialize() throws InstantiationException,
+    IllegalAccessException;
+
+  /**
+   * Retrieves the current superstep.
+   *
+   * @return Current superstep
+   */
+  public long getSuperstep() {
+    return getGraphState().getSuperstep();
+  }
+
+  /**
+   * Get the total (all workers) number of vertices that
+   * existed in the previous superstep.
+   *
+   * @return Total number of vertices (-1 if first superstep)
+   */
+  public long getTotalNumVertices() {
+    return getGraphState().getTotalNumVertices();
+  }
+
+  /**
+   * Get the total (all workers) number of edges that
+   * existed in the previous superstep.
+   *
+   * @return Total number of edges (-1 if first superstep)
+   */
+  public long getTotalNumEdges() {
+    return getGraphState().getTotalNumEdges();
+  }
+
+  /**
+   * After this is called, the computation will stop, even if there are
+   * still messages in the system or vertices that have not voted to halt.
+   */
+  public void haltComputation() {
+    halt = true;
+  }
+
+  /**
+   * Has the master halted?
+   *
+   * @return True if halted, false otherwise.
+   */
+  public boolean isHalted() {
+    return halt;
+  }
+
+  /**
+   * Get the graph state for all workers.
+   *
+   * @return Graph state for all workers
+   */
+  GraphState getGraphState() {
+    return graphState;
+  }
+
+  /**
+   * Set the graph state for all workers
+   *
+   * @param graphState Graph state for all workers
+   */
+  void setGraphState(GraphState graphState) {
+    this.graphState = graphState;
+  }
+
+  /**
+   * Get the mapper context
+   *
+   * @return Mapper context
+   */
+  public Mapper.Context getContext() {
+    return getGraphState().getContext();
+  }
+
+  @Override
+  public final <A extends Writable> boolean registerAggregator(
+    String name, Class<? extends Aggregator<A>> aggregatorClass)
+    throws InstantiationException, IllegalAccessException {
+    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
+        registerAggregator(name, aggregatorClass);
+  }
+
+  @Override
+  public final <A extends Writable> boolean registerPersistentAggregator(
+      String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
+        registerPersistentAggregator(name, aggregatorClass);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
+        <A>getAggregatedValue(name);
+  }
+
+  @Override
+  public <A extends Writable> void setAggregatedValue(String name, A value) {
+    getGraphState().getGraphMapper().getMasterAggregatorUsage().
+        setAggregatedValue(name, value);
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java
new file mode 100644
index 0000000..96e988c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.graph.TaskInfo;
+
+/**
+ * Information about the master that is sent to other workers.
+ */
+public class MasterInfo extends TaskInfo {
+  /**
+   * Constructor
+   */
+  public MasterInfo() {
+  }
+
+  @Override
+  public String toString() {
+    return "Master(" + super.toString() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
new file mode 100644
index 0000000..ee749e1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
+import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.counters.GiraphTimers;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+/**
+ * Master thread that will coordinate the activities of the tasks.  It runs
+ * on all task processes, however, will only execute its algorithm if it knows
+ * it is the "leader" from ZooKeeper.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class MasterThread<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable> extends Thread {
+  /** Counter group name for the Giraph timers */
+  public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(MasterThread.class);
+  /** Reference to shared BspService */
+  private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
+  /** Context (for counters) */
+  private final Context context;
+  /** Use superstep counters? */
+  private final boolean superstepCounterOn;
+  /** Setup seconds */
+  private double setupSecs = 0d;
+  /** Superstep timer (in seconds) map */
+  private final Map<Long, Double> superstepSecsMap =
+      new TreeMap<Long, Double>();
+
+  /**
+   * Constructor.
+   *
+   * @param bspServiceMaster Master that already exists and setup() has
+   *        been called.
+   * @param context Context from the Mapper.
+   */
+  public MasterThread(CentralizedServiceMaster<I, V, E, M> bspServiceMaster,
+      Context context) {
+    super(MasterThread.class.getName());
+    this.bspServiceMaster = bspServiceMaster;
+    this.context = context;
+    GiraphTimers.init(context);
+    superstepCounterOn = context.getConfiguration().getBoolean(
+        GiraphConstants.USE_SUPERSTEP_COUNTERS,
+        GiraphConstants.USE_SUPERSTEP_COUNTERS_DEFAULT);
+  }
+
+  /**
+   * The master algorithm.  The algorithm should be able to withstand
+   * failures and resume as necessary since the master may switch during a
+   * job.
+   */
+  @Override
+  public void run() {
+    // Algorithm:
+    // 1. Become the master
+    // 2. If desired, restart from a manual checkpoint
+    // 3. Run all supersteps until complete
+    try {
+      long startMillis = System.currentTimeMillis();
+      long endMillis = 0;
+      bspServiceMaster.setup();
+      if (bspServiceMaster.becomeMaster()) {
+        // Attempt to create InputSplits if necessary. Bail out if that fails.
+        if (bspServiceMaster.getRestartedSuperstep() !=
+            BspService.UNSET_SUPERSTEP ||
+            (bspServiceMaster.createVertexInputSplits() != -1 &&
+                bspServiceMaster.createEdgeInputSplits() != -1)) {
+          long setupMillis = System.currentTimeMillis() - startMillis;
+          GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
+          setupSecs = setupMillis / 1000.0d;
+          SuperstepState superstepState = SuperstepState.INITIAL;
+          long cachedSuperstep = BspService.UNSET_SUPERSTEP;
+          while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
+            long startSuperstepMillis = System.currentTimeMillis();
+            cachedSuperstep = bspServiceMaster.getSuperstep();
+            GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
+            superstepState = bspServiceMaster.coordinateSuperstep();
+            long superstepMillis = System.currentTimeMillis() -
+                startSuperstepMillis;
+            superstepSecsMap.put(Long.valueOf(cachedSuperstep),
+                superstepMillis / 1000.0d);
+            if (LOG.isInfoEnabled()) {
+              LOG.info("masterThread: Coordination of superstep " +
+                  cachedSuperstep + " took " +
+                  superstepMillis / 1000.0d +
+                  " seconds ended with state " + superstepState +
+                  " and is now on superstep " +
+                  bspServiceMaster.getSuperstep());
+            }
+            if (superstepCounterOn) {
+              GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep).
+                increment(superstepMillis);
+            }
+
+            bspServiceMaster.postSuperstep();
+
+            // If a worker failed, restart from a known good superstep
+            if (superstepState == SuperstepState.WORKER_FAILURE) {
+              bspServiceMaster.restartFromCheckpoint(
+                  bspServiceMaster.getLastGoodCheckpoint());
+            }
+            endMillis = System.currentTimeMillis();
+          }
+          bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
+        }
+      }
+      bspServiceMaster.cleanup();
+      if (!superstepSecsMap.isEmpty()) {
+        GiraphTimers.getInstance().getShutdownMs().
+          increment(System.currentTimeMillis() - endMillis);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("setup: Took " + setupSecs + " seconds.");
+        }
+        for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
+          if (LOG.isInfoEnabled()) {
+            if (entry.getKey().longValue() ==
+                BspService.INPUT_SUPERSTEP) {
+              LOG.info("input superstep: Took " +
+                  entry.getValue() + " seconds.");
+            } else {
+              LOG.info("superstep " + entry.getKey() + ": Took " +
+                  entry.getValue() + " seconds.");
+            }
+          }
+          context.progress();
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("shutdown: Took " +
+              (System.currentTimeMillis() - endMillis) /
+              1000.0d + " seconds.");
+          LOG.info("total: Took " +
+              ((System.currentTimeMillis() - startMillis) /
+              1000.0d) + " seconds.");
+        }
+        GiraphTimers.getInstance().getTotalMs().
+          increment(System.currentTimeMillis() - startMillis);
+      }
+      bspServiceMaster.postApplication();
+      // CHECKSTYLE: stop IllegalCatchCheck
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatchCheck
+      bspServiceMaster.failureCleanup(e);
+      LOG.error("masterThread: Master algorithm failed with " +
+          e.getClass().getSimpleName(), e);
+      throw new IllegalStateException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/package-info.java b/giraph-core/src/main/java/org/apache/giraph/master/package-info.java
index 3c223ee..056f0ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/package-info.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/package-info.java
@@ -16,6 +16,6 @@
  * limitations under the License.
  */
 /**
- * Package of all the observer related things.
+ * Package of all the master related things.
  */
 package org.apache.giraph.master;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
index f146820..6052fd8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.metrics;
 
-import org.apache.giraph.graph.BspServiceWorker;
+import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.giraph.graph.GraphMapper;
 
 import com.google.common.collect.Maps;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
index f30f199..43e40d9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
@@ -18,7 +18,7 @@
 package org.apache.giraph.metrics;
 
 import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.graph.BspService;
+import org.apache.giraph.bsp.BspService;
 
 import com.google.common.collect.Lists;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphTimerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphTimerContext.java b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphTimerContext.java
index 5c7942a..45d24d9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphTimerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphTimerContext.java
@@ -18,9 +18,9 @@
 
 package org.apache.giraph.metrics;
 
-import org.apache.giraph.utils.SystemTime;
-import org.apache.giraph.utils.Time;
-import org.apache.giraph.utils.Times;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
+import org.apache.giraph.time.Times;
 
 import java.util.concurrent.TimeUnit;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
index dd9eef7..71aad31 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.metrics;
 
 import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.graph.BspService;
+import org.apache.giraph.bsp.BspService;
 
 import java.io.PrintStream;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
index 9db5107..8fec14d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.metrics;
 
-import org.apache.giraph.graph.BspServiceWorker;
+import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.giraph.graph.ComputeCallable;
 import org.apache.giraph.graph.GraphMapper;
 import org.apache.hadoop.io.Writable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
new file mode 100644
index 0000000..c1df04b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.worker.WorkerInfo;
+
+/**
+ * Basic partition owner, can be subclassed for more complicated partition
+ * owner implementations.
+ */
+public class BasicPartitionOwner implements PartitionOwner,
+    ImmutableClassesGiraphConfigurable {
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+  /** Partition id */
+  private int partitionId = -1;
+  /** Owning worker information */
+  private WorkerInfo workerInfo;
+  /** Previous (if any) worker info */
+  private WorkerInfo previousWorkerInfo;
+  /** Checkpoint files prefix for this partition */
+  private String checkpointFilesPrefix;
+
+  /**
+   * Default constructor.
+   */
+  public BasicPartitionOwner() { }
+
+  /**
+   * Constructor with partition id and worker info.
+   *
+   * @param partitionId Partition id of this partition.
+   * @param workerInfo Owner of the partition.
+   */
+  public BasicPartitionOwner(int partitionId, WorkerInfo workerInfo) {
+    this(partitionId, workerInfo, null, null);
+  }
+
+  /**
+   * Constructor with partition id and worker info.
+   *
+   * @param partitionId Partition id of this partition.
+   * @param workerInfo Owner of the partition.
+   * @param previousWorkerInfo Previous owner of this partition.
+   * @param checkpointFilesPrefix Prefix of the checkpoint files.
+   */
+  public BasicPartitionOwner(int partitionId,
+                             WorkerInfo workerInfo,
+                             WorkerInfo previousWorkerInfo,
+                             String checkpointFilesPrefix) {
+    this.partitionId = partitionId;
+    this.workerInfo = workerInfo;
+    this.previousWorkerInfo = previousWorkerInfo;
+    this.checkpointFilesPrefix = checkpointFilesPrefix;
+  }
+
+  @Override
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  @Override
+  public WorkerInfo getWorkerInfo() {
+    return workerInfo;
+  }
+
+  @Override
+  public void setWorkerInfo(WorkerInfo workerInfo) {
+    this.workerInfo = workerInfo;
+  }
+
+  @Override
+  public WorkerInfo getPreviousWorkerInfo() {
+    return previousWorkerInfo;
+  }
+
+  @Override
+  public void setPreviousWorkerInfo(WorkerInfo workerInfo) {
+    this.previousWorkerInfo = workerInfo;
+  }
+
+  @Override
+  public String getCheckpointFilesPrefix() {
+    return checkpointFilesPrefix;
+  }
+
+  @Override
+  public void setCheckpointFilesPrefix(String checkpointFilesPrefix) {
+    this.checkpointFilesPrefix = checkpointFilesPrefix;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    partitionId = input.readInt();
+    workerInfo = new WorkerInfo();
+    workerInfo.readFields(input);
+    boolean hasPreviousWorkerInfo = input.readBoolean();
+    if (hasPreviousWorkerInfo) {
+      previousWorkerInfo = new WorkerInfo();
+      previousWorkerInfo.readFields(input);
+    }
+    boolean hasCheckpointFilePrefix = input.readBoolean();
+    if (hasCheckpointFilePrefix) {
+      checkpointFilesPrefix = input.readUTF();
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(partitionId);
+    workerInfo.write(output);
+    if (previousWorkerInfo != null) {
+      output.writeBoolean(true);
+      previousWorkerInfo.write(output);
+    } else {
+      output.writeBoolean(false);
+    }
+    if (checkpointFilesPrefix != null) {
+      output.writeBoolean(true);
+      output.writeUTF(checkpointFilesPrefix);
+    } else {
+      output.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String toString() {
+    return "(id=" + partitionId + ",cur=" + workerInfo + ",prev=" +
+        previousWorkerInfo + ",ckpt_file=" + checkpointFilesPrefix + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
new file mode 100644
index 0000000..d34af11
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.partition;
+
+import com.google.common.collect.MapMaker;
+
+import com.google.common.primitives.Ints;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+/**
+ * Byte array based partition.  Should reduce the amount of memory used since
+ * the entire graph is compressed into byte arrays.  Must guarantee, however,
+ * that only one thread at a time will call getVertex since it is a singleton.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class ByteArrayPartition<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Partition<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class);
+  /** Configuration from the worker */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Partition id */
+  private int id;
+  /**
+   * Vertex map for this range (keyed by index).  Note that the byte[] is a
+   * serialized vertex with the first four bytes as the length of the vertex
+   * to read.
+   */
+  private ConcurrentMap<I, byte[]> vertexMap;
+  /** Context used to report progress */
+  private Progressable progressable;
+  /** Representative vertex */
+  private Vertex<I, V, E, M> representativeVertex;
+  /** Use unsafe serialization */
+  private boolean useUnsafeSerialization;
+
+  /**
+   * Constructor for reflection.
+   */
+  public ByteArrayPartition() { }
+
+  @Override
+  public void initialize(int partitionId, Progressable progressable) {
+    setId(partitionId);
+    setProgressable(progressable);
+    vertexMap = new MapMaker().concurrencyLevel(
+        conf.getNettyServerExecutionConcurrency()).makeMap();
+    representativeVertex = conf.createVertex();
+    useUnsafeSerialization = conf.useUnsafeSerialization();
+  }
+
+  @Override
+  public Vertex<I, V, E, M> getVertex(I vertexIndex) {
+    byte[] vertexData = vertexMap.get(vertexIndex);
+    if (vertexData == null) {
+      return null;
+    }
+    WritableUtils.readFieldsFromByteArrayWithSize(
+        vertexData, representativeVertex, useUnsafeSerialization);
+    return representativeVertex;
+  }
+
+  @Override
+  public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
+    byte[] vertexData =
+        WritableUtils.writeToByteArrayWithSize(vertex, useUnsafeSerialization);
+    byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
+    if (oldVertexBytes == null) {
+      return null;
+    } else {
+      WritableUtils.readFieldsFromByteArrayWithSize(
+          oldVertexBytes, representativeVertex, useUnsafeSerialization);
+      return representativeVertex;
+    }
+  }
+
+  @Override
+  public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
+    byte[] vertexBytes = vertexMap.remove(vertexIndex);
+    if (vertexBytes == null) {
+      return null;
+    }
+    WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
+        representativeVertex, useUnsafeSerialization);
+    return representativeVertex;
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    // Only work with other ByteArrayPartition instances
+    if (!(partition instanceof ByteArrayPartition)) {
+      throw new IllegalStateException("addPartition: Cannot add partition " +
+          "of type " + partition.getClass());
+    }
+
+    ByteArrayPartition<I, V, E, M> byteArrayPartition =
+        (ByteArrayPartition<I, V, E, M>) partition;
+    for (Map.Entry<I, byte[]> entry :
+        byteArrayPartition.vertexMap.entrySet()) {
+      vertexMap.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public long getVertexCount() {
+    return vertexMap.size();
+  }
+
+  @Override
+  public long getEdgeCount() {
+    long edges = 0;
+    for (byte[] vertexBytes : vertexMap.values()) {
+      WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
+          representativeVertex, useUnsafeSerialization);
+      edges += representativeVertex.getNumEdges();
+    }
+    return edges;
+  }
+
+  @Override
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
+  @Override
+  public void setProgressable(Progressable progressable) {
+    this.progressable = progressable;
+  }
+
+  @Override
+  public void saveVertex(Vertex<I, V, E, M> vertex) {
+    // Reuse the old buffer whenever possible
+    byte[] oldVertexData = vertexMap.get(vertex.getId());
+    if (oldVertexData != null) {
+      vertexMap.put(vertex.getId(),
+          WritableUtils.writeToByteArrayWithSize(
+              vertex, oldVertexData, useUnsafeSerialization));
+    } else {
+      vertexMap.put(vertex.getId(),
+          WritableUtils.writeToByteArrayWithSize(
+              vertex, useUnsafeSerialization));
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(id);
+    output.writeInt(vertexMap.size());
+    for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
+      if (progressable != null) {
+        progressable.progress();
+      }
+      entry.getKey().write(output);
+      // Note here that we are writing the size of the vertex data first
+      // as it is encoded in the first four bytes of the byte[]
+      int vertexDataSize;
+      if (useUnsafeSerialization) {
+        vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(),
+            0);
+      } else {
+        vertexDataSize = Ints.fromByteArray(entry.getValue());
+      }
+
+      output.writeInt(vertexDataSize);
+      output.write(entry.getValue(), 0, vertexDataSize);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    id = input.readInt();
+    int size = input.readInt();
+    vertexMap = new MapMaker().concurrencyLevel(
+        conf.getNettyServerExecutionConcurrency()).initialCapacity(
+        size).makeMap();
+    representativeVertex = conf.createVertex();
+    useUnsafeSerialization = conf.useUnsafeSerialization();
+    for (int i = 0; i < size; ++i) {
+      if (progressable != null) {
+        progressable.progress();
+      }
+      I vertexId = conf.createVertexId();
+      vertexId.readFields(input);
+      int vertexDataSize = input.readInt();
+      byte[] vertexData = new byte[vertexDataSize];
+      input.readFully(vertexData);
+      if (vertexMap.put(vertexId, vertexData) != null) {
+        throw new IllegalStateException("readFields: Already saw vertex " +
+            vertexId);
+      }
+    }
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+    conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return conf;
+  }
+
+  @Override
+  public Iterator<Vertex<I, V, E, M>> iterator() {
+    return new RepresentativeVertexIterator();
+  }
+
+  /**
+   * Iterator that deserializes a vertex from a byte array on the fly, using
+   * the same representative vertex object.
+   */
+  private class RepresentativeVertexIterator implements
+      Iterator<Vertex<I, V, E, M>> {
+    /** Iterator to the vertex values */
+    private Iterator<byte[]> vertexDataIterator =
+        vertexMap.values().iterator();
+
+    @Override
+    public boolean hasNext() {
+      return vertexDataIterator.hasNext();
+    }
+
+    @Override
+    public Vertex<I, V, E, M> next() {
+      WritableUtils.readFieldsFromByteArrayWithSize(
+          vertexDataIterator.next(), representativeVertex,
+          useUnsafeSerialization);
+      return representativeVertex;
+    }
+
+    @Override
+    public void remove() {
+      throw new IllegalAccessError("remove: This method is not supported.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
new file mode 100644
index 0000000..09e5d75
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A partition store that can possibly spill to disk.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class DiskBackedPartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends PartitionStore<I, V, E, M> {
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(DiskBackedPartitionStore.class);
+  /** Map of partitions kept in memory. */
+  private final ConcurrentMap<Integer, Partition<I, V, E, M>>
+  inMemoryPartitions = new ConcurrentHashMap<Integer, Partition<I, V, E, M>>();
+  /** Maximum number of partitions to keep in memory. */
+  private int maxInMemoryPartitions;
+  /** Map of partitions kept out-of-core. The values are partition sizes. */
+  private final ConcurrentMap<Integer, Integer> onDiskPartitions =
+      Maps.newConcurrentMap();
+  /** Directory on the local file system for storing out-of-core partitions. */
+  private final String basePath;
+  /** Configuration. */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Slot for loading out-of-core partitions. */
+  private Partition<I, V, E, M> loadedPartition;
+  /** Locks for accessing and modifying partitions. */
+  private final ConcurrentMap<Integer, Lock> partitionLocks =
+      Maps.newConcurrentMap();
+  /** Context used to report progress */
+  private final Mapper<?, ?, ?, ?>.Context context;
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   * @param context Mapper context
+   */
+  public DiskBackedPartitionStore(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+      Mapper<?, ?, ?, ?>.Context context) {
+    this.conf = conf;
+    this.context = context;
+    // We must be able to hold at least one partition in memory
+    maxInMemoryPartitions = Math.max(1,
+        conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY,
+            GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
+    basePath = conf.get("mapred.job.id", "Unknown Job") +
+        conf.get(GiraphConstants.PARTITIONS_DIRECTORY,
+            GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
+  }
+
+  /**
+   * Get the path to the file where a partition is stored.
+   *
+   * @param partitionId The partition
+   * @return The path to the given partition
+   */
+  private String getPartitionPath(Integer partitionId) {
+    return basePath + "/partition-" + partitionId;
+  }
+
+  /**
+   * Create a new lock for a partition, lock it, and return it. If already
+   * existing, return null.
+   *
+   * @param partitionId Partition id
+   * @return A newly created lock, or null if already present
+   */
+  private Lock createLock(Integer partitionId) {
+    Lock lock = new ReentrantLock(true);
+    lock.lock();
+    if (partitionLocks.putIfAbsent(partitionId, lock) != null) {
+      return null;
+    }
+    return lock;
+  }
+
+  /**
+   * Get the lock for a partition id.
+   *
+   * @param partitionId Partition id
+   * @return The lock
+   */
+  private Lock getLock(Integer partitionId) {
+    return partitionLocks.get(partitionId);
+  }
+
+  /**
+   * Write a partition to disk.
+   *
+   * @param partition The partition object to write
+   * @throws java.io.IOException
+   */
+  private void writePartition(Partition<I, V, E, M> partition)
+    throws IOException {
+    File file = new File(getPartitionPath(partition.getId()));
+    file.getParentFile().mkdirs();
+    file.createNewFile();
+    DataOutputStream outputStream = new DataOutputStream(
+        new BufferedOutputStream(new FileOutputStream(file)));
+    for (Vertex<I, V, E, M> vertex : partition) {
+      vertex.write(outputStream);
+    }
+    outputStream.close();
+  }
+
+  /**
+   * Read a partition from disk.
+   *
+   * @param partitionId Id of the partition to read
+   * @return The partition object
+   * @throws IOException
+   */
+  private Partition<I, V, E, M> readPartition(Integer partitionId)
+    throws IOException {
+    Partition<I, V, E, M> partition =
+        conf.createPartition(partitionId, context);
+    File file = new File(getPartitionPath(partitionId));
+    DataInputStream inputStream = new DataInputStream(
+        new BufferedInputStream(new FileInputStream(file)));
+    int numVertices = onDiskPartitions.get(partitionId);
+    for (int i = 0; i < numVertices; ++i) {
+      Vertex<I, V, E, M> vertex = conf.createVertex();
+      vertex.readFields(inputStream);
+      partition.putVertex(vertex);
+    }
+    inputStream.close();
+    file.delete();
+    return partition;
+  }
+
+  /**
+   * Append some vertices of another partition to an out-of-core partition.
+   *
+   * @param partition Partition to add
+   * @throws IOException
+   */
+  private void appendPartitionOutOfCore(Partition<I, V, E, M> partition)
+    throws IOException {
+    File file = new File(getPartitionPath(partition.getId()));
+    DataOutputStream outputStream = new DataOutputStream(
+        new BufferedOutputStream(new FileOutputStream(file, true)));
+    for (Vertex<I, V, E, M> vertex : partition) {
+      vertex.write(outputStream);
+    }
+    outputStream.close();
+  }
+
+  /**
+   * Load an out-of-core partition in memory.
+   *
+   * @param partitionId Partition id
+   */
+  private void loadPartition(Integer partitionId) {
+    if (loadedPartition != null) {
+      if (loadedPartition.getId() == partitionId) {
+        return;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("loadPartition: moving partition " + loadedPartition.getId() +
+            " out of core with size " + loadedPartition.getVertexCount());
+      }
+      try {
+        writePartition(loadedPartition);
+        onDiskPartitions.put(loadedPartition.getId(),
+            (int) loadedPartition.getVertexCount());
+        loadedPartition = null;
+      } catch (IOException e) {
+        throw new IllegalStateException("loadPartition: failed writing " +
+            "partition " + loadedPartition.getId() + " to disk", e);
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadPartition: loading partition " + partitionId +
+          " in memory");
+    }
+    try {
+      loadedPartition = readPartition(partitionId);
+    } catch (IOException e) {
+      throw new IllegalStateException("loadPartition: failed reading " +
+          "partition " + partitionId + " from disk");
+    }
+  }
+
+  /**
+   * Add a new partition without requiring a lock.
+   *
+   * @param partition Partition to be added
+   */
+  private void addPartitionNoLock(Partition<I, V, E, M> partition) {
+    synchronized (inMemoryPartitions) {
+      if (inMemoryPartitions.size() + 1 < maxInMemoryPartitions) {
+        inMemoryPartitions.put(partition.getId(), partition);
+
+        return;
+      }
+    }
+    try {
+      writePartition(partition);
+      onDiskPartitions.put(partition.getId(),
+          (int) partition.getVertexCount());
+    } catch (IOException e) {
+      throw new IllegalStateException("addPartition: failed writing " +
+          "partition " + partition.getId() + "to disk");
+    }
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    if (inMemoryPartitions.containsKey(partition.getId())) {
+      Partition<I, V, E, M> existingPartition =
+          inMemoryPartitions.get(partition.getId());
+      existingPartition.addPartition(partition);
+    } else if (onDiskPartitions.containsKey(partition.getId())) {
+      Lock lock = getLock(partition.getId());
+      lock.lock();
+      if (loadedPartition != null && loadedPartition.getId() ==
+          partition.getId()) {
+        loadedPartition.addPartition(partition);
+      } else {
+        try {
+          appendPartitionOutOfCore(partition);
+          onDiskPartitions.put(partition.getId(),
+              onDiskPartitions.get(partition.getId()) +
+                  (int) partition.getVertexCount());
+        } catch (IOException e) {
+          throw new IllegalStateException("addPartition: failed " +
+              "writing vertices to partition " + partition.getId() + " on disk",
+              e);
+        }
+      }
+      lock.unlock();
+    } else {
+      Lock lock = createLock(partition.getId());
+      if (lock != null) {
+        addPartitionNoLock(partition);
+        lock.unlock();
+      } else {
+        // Another thread is already creating the partition,
+        // so we make sure it's done before repeating the call.
+        lock = getLock(partition.getId());
+        lock.lock();
+        lock.unlock();
+        addPartition(partition);
+      }
+    }
+  }
+
+  @Override
+  public Partition<I, V, E, M> getPartition(Integer partitionId) {
+    if (inMemoryPartitions.containsKey(partitionId)) {
+      return inMemoryPartitions.get(partitionId);
+    } else if (onDiskPartitions.containsKey(partitionId)) {
+      loadPartition(partitionId);
+      return loadedPartition;
+    } else {
+      throw new IllegalStateException("getPartition: partition " +
+          partitionId + " does not exist");
+    }
+  }
+
+  @Override
+  public Partition<I, V, E, M> removePartition(Integer partitionId) {
+    partitionLocks.remove(partitionId);
+    if (onDiskPartitions.containsKey(partitionId)) {
+      Partition<I, V, E, M> partition;
+      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
+        partition = loadedPartition;
+        loadedPartition = null;
+      } else {
+        try {
+          partition = readPartition(partitionId);
+        } catch (IOException e) {
+          throw new IllegalStateException("removePartition: failed reading " +
+              "partition " + partitionId + " from disk", e);
+        }
+      }
+      onDiskPartitions.remove(partitionId);
+      return partition;
+    } else {
+      return inMemoryPartitions.remove(partitionId);
+    }
+  }
+
+  @Override
+  public void deletePartition(Integer partitionId) {
+    partitionLocks.remove(partitionId);
+    if (inMemoryPartitions.containsKey(partitionId)) {
+      inMemoryPartitions.remove(partitionId);
+    } else {
+      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
+        loadedPartition = null;
+      } else {
+        File file = new File(getPartitionPath(partitionId));
+        file.delete();
+      }
+      onDiskPartitions.remove(partitionId);
+    }
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return partitionLocks.containsKey(partitionId);
+  }
+
+  @Override
+  public Iterable<Integer> getPartitionIds() {
+    return Iterables.concat(inMemoryPartitions.keySet(),
+        onDiskPartitions.keySet());
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return partitionLocks.size();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
new file mode 100644
index 0000000..e2e04dd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Defines the partitioning framework for this application.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public interface GraphPartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    ImmutableClassesGiraphConfigurable {
+  /**
+   * Create the {@link MasterGraphPartitioner} used by the master.
+   * Instantiated once by the master and reused.
+   *
+   * @return Instantiated master graph partitioner
+   */
+  MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
+
+  /**
+   * Create the {@link WorkerGraphPartitioner} used by the worker.
+   * Instantiated once by every worker and reused.
+   *
+   * @return Instantiated worker graph partitioner
+   */
+  WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
new file mode 100644
index 0000000..3e9a9c2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Master will execute a hash based partitioning.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashMasterPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    MasterGraphPartitioner<I, V, E, M> {
+  /** Multiplier for the current workers squared */
+  public static final String PARTITION_COUNT_MULTIPLIER =
+    "hash.masterPartitionCountMultipler";
+  /** Default mulitplier for current workers squared */
+  public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
+  /** Overrides default partition count calculation if not -1 */
+  public static final String USER_PARTITION_COUNT =
+    "hash.userPartitionCount";
+  /** Default user partition count */
+  public static final int DEFAULT_USER_PARTITION_COUNT = -1;
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
+  /**
+   * ZooKeeper has a limit of the data in a single znode of 1 MB and
+   * each entry can go be on the average somewhat more than 300 bytes
+   */
+  private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
+  /** Provided configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+  /** Specified partition count (overrides calculation) */
+  private final int userPartitionCount;
+  /** Partition count (calculated in createInitialPartitionOwners) */
+  private int partitionCount = -1;
+  /** Save the last generated partition owner list */
+  private List<PartitionOwner> partitionOwnerList;
+
+  /**
+   * Constructor.
+   *
+   *@param conf Configuration used.
+   */
+  public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+    userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
+        DEFAULT_USER_PARTITION_COUNT);
+  }
+
+  @Override
+  public Collection<PartitionOwner> createInitialPartitionOwners(
+      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
+    if (availableWorkerInfos.isEmpty()) {
+      throw new IllegalArgumentException(
+          "createInitialPartitionOwners: No available workers");
+    }
+    List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
+    Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
+    if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
+      float multiplier = conf.getFloat(
+          PARTITION_COUNT_MULTIPLIER,
+          DEFAULT_PARTITION_COUNT_MULTIPLIER);
+      partitionCount =
+          Math.max((int) (multiplier * availableWorkerInfos.size() *
+              availableWorkerInfos.size()),
+              1);
+    } else {
+      partitionCount = userPartitionCount;
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("createInitialPartitionOwners: Creating " +
+        partitionCount + ", default would have been " +
+        (availableWorkerInfos.size() *
+         availableWorkerInfos.size()) + " partitions.");
+    }
+    if (partitionCount > MAX_PARTTIONS) {
+      LOG.warn("createInitialPartitionOwners: " +
+          "Reducing the partitionCount to " + MAX_PARTTIONS +
+          " from " + partitionCount);
+      partitionCount = MAX_PARTTIONS;
+    }
+
+    for (int i = 0; i < partitionCount; ++i) {
+      PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
+      if (!workerIt.hasNext()) {
+        workerIt = availableWorkerInfos.iterator();
+      }
+      ownerList.add(owner);
+    }
+    this.partitionOwnerList = ownerList;
+    return ownerList;
+  }
+
+  @Override
+  public Collection<PartitionOwner> getCurrentPartitionOwners() {
+    return partitionOwnerList;
+  }
+
+  /**
+   * Subclasses can set the partition owner list.
+   *
+   * @param partitionOwnerList New partition owner list.
+   */
+  protected void setPartitionOwnerList(List<PartitionOwner>
+  partitionOwnerList) {
+    this.partitionOwnerList = partitionOwnerList;
+  }
+
+  @Override
+  public Collection<PartitionOwner> generateChangedPartitionOwners(
+      Collection<PartitionStats> allPartitionStatsList,
+      Collection<WorkerInfo> availableWorkerInfos,
+      int maxWorkers,
+      long superstep) {
+    return PartitionBalancer.balancePartitionsAcrossWorkers(
+        conf,
+        partitionOwnerList,
+        allPartitionStatsList,
+        availableWorkerInfos);
+  }
+
+  @Override
+  public PartitionStats createPartitionStats() {
+    return new PartitionStats();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
new file mode 100644
index 0000000..f7343a1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Divides the vertices into partitions by their hash code using a simple
+ * round-robin hash for great balancing if given a random hash code.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashPartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements GraphPartitionerFactory<I, V, E, M> {
+  /** Saved configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  @Override
+  public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
+    return new HashMasterPartitioner<I, V, E, M>(getConf());
+  }
+
+  @Override
+  public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
+    return new HashWorkerPartitioner<I, V, E, M>();
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
new file mode 100644
index 0000000..227e234
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Divides the vertices into partitions by their hash code using ranges of the
+ * hash space.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashRangePartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements GraphPartitionerFactory<I, V, E, M> {
+  /** Saved configuration */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+
+  @Override
+  public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
+    return new HashMasterPartitioner<I, V, E, M>(getConf());
+  }
+
+  @Override
+  public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
+    return new HashRangeWorkerPartitioner<I, V, E, M>();
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
new file mode 100644
index 0000000..a6e764d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.primitives.UnsignedInts;
+
+/**
+ * Implements range-based partitioning from the id hash code.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashRangeWorkerPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends HashWorkerPartitioner<I, V, E, M> {
+  /** A transformed hashCode() must be strictly smaller than this. */
+  private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L;
+
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    long unsignedHashCode = UnsignedInts.toLong(vertexId.hashCode());
+    // The reader can verify that unsignedHashCode of HASH_LIMIT - 1 yields
+    // index of size - 1, and unsignedHashCode of 0 yields index of 0.
+    int index = (int)
+        ((unsignedHashCode * getPartitionOwners().size()) / HASH_LIMIT);
+    return partitionOwnerList.get(index);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
new file mode 100644
index 0000000..bb6e38d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Implements hash-based partitioning from the id hash code.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class HashWorkerPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements WorkerGraphPartitioner<I, V, E, M> {
+  /**
+   * Mapping of the vertex ids to {@link PartitionOwner}.
+   */
+  protected List<PartitionOwner> partitionOwnerList =
+      Lists.newArrayList();
+
+  @Override
+  public PartitionOwner createPartitionOwner() {
+    return new BasicPartitionOwner();
+  }
+
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    return partitionOwnerList.get(
+        Math.abs(vertexId.hashCode() % partitionOwnerList.size()));
+  }
+
+  @Override
+  public Collection<PartitionStats> finalizePartitionStats(
+      Collection<PartitionStats> workerPartitionStats,
+      PartitionStore<I, V, E, M> partitionStore) {
+    // No modification necessary
+    return workerPartitionStats;
+  }
+
+  @Override
+  public PartitionExchange updatePartitionOwners(
+      WorkerInfo myWorkerInfo,
+      Collection<? extends PartitionOwner> masterSetPartitionOwners,
+      PartitionStore<I, V, E, M> partitionStore) {
+    partitionOwnerList.clear();
+    partitionOwnerList.addAll(masterSetPartitionOwners);
+
+    Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
+    Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
+        new HashMap<WorkerInfo, List<Integer>>();
+    for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+      if (partitionOwner.getPreviousWorkerInfo() == null) {
+        continue;
+      } else if (partitionOwner.getWorkerInfo().equals(
+          myWorkerInfo) &&
+          partitionOwner.getPreviousWorkerInfo().equals(
+              myWorkerInfo)) {
+        throw new IllegalStateException(
+            "updatePartitionOwners: Impossible to have the same " +
+                "previous and current worker info " + partitionOwner +
+                " as me " + myWorkerInfo);
+      } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
+        dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
+      } else if (partitionOwner.getPreviousWorkerInfo().equals(
+          myWorkerInfo)) {
+        if (workerPartitionOwnerMap.containsKey(
+            partitionOwner.getWorkerInfo())) {
+          workerPartitionOwnerMap.get(
+              partitionOwner.getWorkerInfo()).add(
+                  partitionOwner.getPartitionId());
+        } else {
+          List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
+          tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
+          workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
+                                      tmpPartitionOwnerList);
+        }
+      }
+    }
+
+    return new PartitionExchange(dependentWorkerSet,
+        workerPartitionOwnerMap);
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> getPartitionOwners() {
+    return partitionOwnerList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
new file mode 100644
index 0000000..130ee07
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.giraph.worker.WorkerInfo;
+
+/**
+ * Determines how to divide the graph into partitions, how to manipulate
+ * partitions and then how to assign those partitions to workers.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public interface MasterGraphPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Set some initial partition owners for the graph. Guaranteed to be called
+   * prior to the graph being loaded (initial or restart).
+   *
+   * @param availableWorkerInfos Workers available for partition assignment
+   * @param maxWorkers Maximum number of workers
+   * @return Collection of generated partition owners.
+   */
+  Collection<PartitionOwner> createInitialPartitionOwners(
+      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers);
+
+  /**
+   * After the worker stats have been merged to a single list, the master can
+   * use this information to send commands to the workers for any
+   * {@link Partition} changes. This protocol is specific to the
+   * {@link MasterGraphPartitioner} implementation.
+   *
+   * @param allPartitionStatsList All partition stats from all workers.
+   * @param availableWorkers Workers available for partition assignment
+   * @param maxWorkers Maximum number of workers
+   * @param superstep Partition owners will be set for this superstep
+   * @return Collection of {@link PartitionOwner} objects that changed from
+   *         the previous superstep, empty list if no change.
+   */
+  Collection<PartitionOwner> generateChangedPartitionOwners(
+      Collection<PartitionStats> allPartitionStatsList,
+      Collection<WorkerInfo> availableWorkers,
+      int maxWorkers,
+      long superstep);
+
+  /**
+   * Get current partition owners at this time.
+   *
+   * @return Collection of current {@link PartitionOwner} objects
+   */
+  Collection<PartitionOwner> getCurrentPartitionOwners();
+
+  /**
+   * Instantiate the {@link PartitionStats} implementation used to read the
+   * worker stats
+   *
+   * @return Instantiated {@link PartitionStats} object
+   */
+  PartitionStats createPartitionStats();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
new file mode 100644
index 0000000..55ce8c0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A generic container that stores vertices.  Vertex ids will map to exactly
+ * one partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface Partition<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends Writable, ImmutableClassesGiraphConfigurable<I, V, E, M>,
+    Iterable<Vertex<I, V, E, M>> {
+  /**
+   * Initialize the partition.  Guaranteed to be called before used.
+   *
+   * @param partitionId Partition id
+   * @param progressable Progressable to call progress
+   */
+  void initialize(int partitionId, Progressable progressable);
+
+  /**
+   * Get the vertex for this vertex index.
+   *
+   * @param vertexIndex Vertex index to search for
+   * @return Vertex if it exists, null otherwise
+   */
+  Vertex<I, V, E, M> getVertex(I vertexIndex);
+
+  /**
+   * Put a vertex into the Partition
+   *
+   * @param vertex Vertex to put in the Partition
+   * @return old vertex value (i.e. null if none existed prior)
+   */
+  Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex);
+
+  /**
+   * Remove a vertex from the Partition
+   *
+   * @param vertexIndex Vertex index to remove
+   * @return The removed vertex.
+   */
+  Vertex<I, V, E, M> removeVertex(I vertexIndex);
+
+  /**
+   * Add a partition's vertices
+   *
+   * @param partition Partition to add
+   */
+  void addPartition(Partition<I, V, E, M> partition);
+
+  /**
+   * Get the number of vertices in this partition
+   *
+   * @return Number of vertices
+   */
+  long getVertexCount();
+
+  /**
+   * Get the number of edges in this partition.
+   *
+   * @return Number of edges.
+   */
+  long getEdgeCount();
+
+  /**
+   * Get the partition id.
+   *
+   * @return Id of this partition.
+   */
+  int getId();
+
+  /**
+   * Set the partition id.
+   *
+   * @param id Id of this partition
+   */
+  void setId(int id);
+
+  /**
+   * Set the context.
+   *
+   * @param progressable Progressable
+   */
+  void setProgressable(Progressable progressable);
+
+  /**
+   * Save potentially modified vertex back to the partition.
+   *
+   * @param vertex Vertex to save
+   */
+  void saveVertex(Vertex<I, V, E, M> vertex);
+}


Mime
View raw message